Compare commits
2 Commits
wmill-scri
...
feat/sandb
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9942bb748c | ||
|
|
430622261f |
35
backend/Cargo.lock
generated
35
backend/Cargo.lock
generated
@@ -15899,6 +15899,7 @@ dependencies = [
|
||||
"windmill-api-jobs",
|
||||
"windmill-api-npm-proxy",
|
||||
"windmill-api-openapi",
|
||||
"windmill-api-sandbox",
|
||||
"windmill-api-schedule",
|
||||
"windmill-api-scripts",
|
||||
"windmill-api-settings",
|
||||
@@ -16229,6 +16230,23 @@ dependencies = [
|
||||
"windmill-trigger-http",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windmill-api-sandbox"
|
||||
version = "1.638.2"
|
||||
dependencies = [
|
||||
"axum 0.7.9",
|
||||
"chrono",
|
||||
"reqwest 0.13.1",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sqlx",
|
||||
"tracing",
|
||||
"uuid",
|
||||
"windmill-api-auth",
|
||||
"windmill-common",
|
||||
"windmill-sandbox",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windmill-api-schedule"
|
||||
version = "1.638.2"
|
||||
@@ -16979,6 +16997,22 @@ dependencies = [
|
||||
"windmill-queue",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windmill-sandbox"
|
||||
version = "1.638.2"
|
||||
dependencies = [
|
||||
"axum 0.7.9",
|
||||
"chrono",
|
||||
"nix 0.27.1",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sqlx",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"uuid",
|
||||
"windmill-common",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "windmill-sql-datatype-parser-wasm"
|
||||
version = "1.638.2"
|
||||
@@ -17437,6 +17471,7 @@ dependencies = [
|
||||
"windmill-parser-yaml",
|
||||
"windmill-queue",
|
||||
"windmill-runtime-nativets",
|
||||
"windmill-sandbox",
|
||||
"yaml-rust",
|
||||
]
|
||||
|
||||
|
||||
@@ -37,9 +37,11 @@ members = [
|
||||
"./windmill-api-inputs",
|
||||
"./windmill-api-npm-proxy",
|
||||
"./windmill-api-openapi",
|
||||
"./windmill-api-sandbox",
|
||||
"./windmill-api-schedule",
|
||||
"./windmill-api-settings",
|
||||
"./windmill-api-workers",
|
||||
"./windmill-sandbox",
|
||||
"./windmill-store",
|
||||
"./windmill-queue",
|
||||
"./windmill-worker",
|
||||
@@ -305,9 +307,11 @@ windmill-api-flow-conversations = { path = "./windmill-api-flow-conversations" }
|
||||
windmill-api-inputs = { path = "./windmill-api-inputs" }
|
||||
windmill-api-npm-proxy = { path = "./windmill-api-npm-proxy" }
|
||||
windmill-api-openapi = { path = "./windmill-api-openapi" }
|
||||
windmill-api-sandbox = { path = "./windmill-api-sandbox" }
|
||||
windmill-api-schedule = { path = "./windmill-api-schedule" }
|
||||
windmill-api-settings = { path = "./windmill-api-settings" }
|
||||
windmill-api-workers = { path = "./windmill-api-workers" }
|
||||
windmill-sandbox = { path = "./windmill-sandbox" }
|
||||
windmill-store = { path = "./windmill-store" }
|
||||
windmill-parser = { path = "./parsers/windmill-parser" }
|
||||
windmill-parser-ts = { path = "./parsers/windmill-parser-ts" }
|
||||
|
||||
@@ -1 +1 @@
|
||||
931813b75b8260faa13ddc07f36a11607b7e3bf6
|
||||
592848d59ca2304926fb2bd85d000668a7f46a77
|
||||
|
||||
4
backend/migrations/20260217000000_sandbox.down.sql
Normal file
4
backend/migrations/20260217000000_sandbox.down.sql
Normal file
@@ -0,0 +1,4 @@
|
||||
DROP TABLE IF EXISTS sandbox_exec;
|
||||
DROP TABLE IF EXISTS sandbox;
|
||||
DROP TABLE IF EXISTS sandbox_host;
|
||||
DROP TYPE IF EXISTS sandbox_status;
|
||||
69
backend/migrations/20260217000000_sandbox.up.sql
Normal file
69
backend/migrations/20260217000000_sandbox.up.sql
Normal file
@@ -0,0 +1,69 @@
|
||||
CREATE TYPE sandbox_status AS ENUM ('creating', 'running', 'suspended', 'stopped', 'error');
|
||||
|
||||
CREATE TABLE sandbox_host (
|
||||
id VARCHAR(255) PRIMARY KEY,
|
||||
base_url TEXT NOT NULL,
|
||||
last_ping TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
capacity INT NOT NULL DEFAULT 10,
|
||||
active_count INT NOT NULL DEFAULT 0,
|
||||
labels JSONB NOT NULL DEFAULT '{}'
|
||||
);
|
||||
|
||||
CREATE TABLE sandbox (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
workspace_id VARCHAR(50) NOT NULL REFERENCES workspace(id),
|
||||
created_by VARCHAR(55) NOT NULL,
|
||||
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
|
||||
image TEXT,
|
||||
timeout_secs INT,
|
||||
idle_timeout_secs INT,
|
||||
cpu_limit INT NOT NULL DEFAULT 1,
|
||||
memory_limit_mb INT NOT NULL DEFAULT 512,
|
||||
disk_limit_mb INT NOT NULL DEFAULT 1024,
|
||||
env_vars JSONB NOT NULL DEFAULT '{}',
|
||||
labels JSONB NOT NULL DEFAULT '{}',
|
||||
mounts JSONB NOT NULL DEFAULT '[]',
|
||||
network_enabled BOOLEAN NOT NULL DEFAULT false,
|
||||
|
||||
mode VARCHAR(20) NOT NULL DEFAULT 'embedded',
|
||||
parent_job_id UUID,
|
||||
host_id VARCHAR(255) REFERENCES sandbox_host(id),
|
||||
|
||||
status sandbox_status NOT NULL DEFAULT 'creating',
|
||||
pid INT,
|
||||
started_at TIMESTAMPTZ,
|
||||
last_activity_at TIMESTAMPTZ,
|
||||
suspended_at TIMESTAMPTZ,
|
||||
stopped_at TIMESTAMPTZ,
|
||||
error_message TEXT,
|
||||
|
||||
ephemeral BOOLEAN NOT NULL DEFAULT false,
|
||||
auto_stop_after_secs INT,
|
||||
expires_at TIMESTAMPTZ
|
||||
);
|
||||
|
||||
CREATE INDEX idx_sandbox_workspace_status ON sandbox(workspace_id, status);
|
||||
CREATE INDEX idx_sandbox_host ON sandbox(host_id) WHERE status IN ('running', 'suspended');
|
||||
CREATE INDEX idx_sandbox_expires ON sandbox(expires_at) WHERE expires_at IS NOT NULL;
|
||||
|
||||
CREATE TABLE sandbox_exec (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
sandbox_id UUID NOT NULL REFERENCES sandbox(id) ON DELETE CASCADE,
|
||||
workspace_id VARCHAR(50) NOT NULL,
|
||||
|
||||
command TEXT NOT NULL,
|
||||
cwd TEXT,
|
||||
env_vars JSONB,
|
||||
|
||||
started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||
completed_at TIMESTAMPTZ,
|
||||
exit_code INT,
|
||||
stdout TEXT,
|
||||
stderr TEXT,
|
||||
duration_ms BIGINT,
|
||||
|
||||
created_by VARCHAR(55) NOT NULL
|
||||
);
|
||||
|
||||
CREATE INDEX idx_sandbox_exec_sandbox ON sandbox_exec(sandbox_id, started_at DESC);
|
||||
@@ -1004,6 +1004,77 @@ pub async fn delete_expired_items(db: &DB) -> () {
|
||||
),
|
||||
}
|
||||
|
||||
// Sandbox cleanup: terminate expired sandboxes, stop idle ones, delete ephemeral stopped sandboxes
|
||||
let sandbox_expired = sqlx::query_scalar!(
|
||||
"UPDATE sandbox SET status = 'stopped'::sandbox_status, stopped_at = now()
|
||||
WHERE status IN ('running', 'suspended', 'creating')
|
||||
AND expires_at IS NOT NULL AND expires_at <= now()
|
||||
RETURNING id"
|
||||
)
|
||||
.fetch_all(db)
|
||||
.await;
|
||||
match sandbox_expired {
|
||||
Ok(ids) if !ids.is_empty() => {
|
||||
tracing::info!("stopped {} expired sandboxes", ids.len());
|
||||
}
|
||||
Err(e) => tracing::error!("Error stopping expired sandboxes: {e}"),
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let sandbox_idle_stopped = sqlx::query_scalar!(
|
||||
"UPDATE sandbox SET status = 'stopped'::sandbox_status, stopped_at = now()
|
||||
WHERE status = 'running'
|
||||
AND idle_timeout_secs IS NOT NULL
|
||||
AND last_activity_at IS NOT NULL
|
||||
AND last_activity_at + (idle_timeout_secs::text || ' seconds')::interval <= now()
|
||||
RETURNING id"
|
||||
)
|
||||
.fetch_all(db)
|
||||
.await;
|
||||
match sandbox_idle_stopped {
|
||||
Ok(ids) if !ids.is_empty() => {
|
||||
tracing::info!("stopped {} idle sandboxes", ids.len());
|
||||
}
|
||||
Err(e) => tracing::error!("Error stopping idle sandboxes: {e}"),
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let sandbox_ephemeral_deleted = sqlx::query_scalar!(
|
||||
"DELETE FROM sandbox WHERE ephemeral = true AND status IN ('stopped', 'error')
|
||||
RETURNING id"
|
||||
)
|
||||
.fetch_all(db)
|
||||
.await;
|
||||
match sandbox_ephemeral_deleted {
|
||||
Ok(ids) if !ids.is_empty() => {
|
||||
tracing::info!("deleted {} ephemeral stopped sandboxes", ids.len());
|
||||
}
|
||||
Err(e) => tracing::error!("Error deleting ephemeral sandboxes: {e}"),
|
||||
_ => {}
|
||||
}
|
||||
|
||||
// Mark sandboxes on dead hosts as errored
|
||||
let sandbox_orphaned = sqlx::query_scalar!(
|
||||
"UPDATE sandbox SET status = 'error'::sandbox_status,
|
||||
error_message = 'Sandbox host became unreachable'
|
||||
WHERE status IN ('running', 'suspended')
|
||||
AND mode = 'remote'
|
||||
AND host_id IS NOT NULL
|
||||
AND host_id NOT IN (
|
||||
SELECT id FROM sandbox_host WHERE last_ping > now() - interval '2 minutes'
|
||||
)
|
||||
RETURNING id"
|
||||
)
|
||||
.fetch_all(db)
|
||||
.await;
|
||||
match sandbox_orphaned {
|
||||
Ok(ids) if !ids.is_empty() => {
|
||||
tracing::info!("marked {} orphaned sandboxes as error", ids.len());
|
||||
}
|
||||
Err(e) => tracing::error!("Error marking orphaned sandboxes: {e}"),
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let job_retention_secs = *JOB_RETENTION_SECS.read().await;
|
||||
if job_retention_secs > 0 {
|
||||
let batch_size = *JOB_CLEANUP_BATCH_SIZE;
|
||||
|
||||
22
backend/windmill-api-sandbox/Cargo.toml
Normal file
22
backend/windmill-api-sandbox/Cargo.toml
Normal file
@@ -0,0 +1,22 @@
|
||||
[package]
|
||||
name = "windmill-api-sandbox"
|
||||
version.workspace = true
|
||||
authors.workspace = true
|
||||
edition.workspace = true
|
||||
|
||||
[lib]
|
||||
name = "windmill_api_sandbox"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
windmill-api-auth.workspace = true
|
||||
windmill-common = { workspace = true, default-features = false }
|
||||
windmill-sandbox.workspace = true
|
||||
axum.workspace = true
|
||||
chrono.workspace = true
|
||||
reqwest.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
sqlx.workspace = true
|
||||
tracing.workspace = true
|
||||
uuid.workspace = true
|
||||
703
backend/windmill-api-sandbox/src/lib.rs
Normal file
703
backend/windmill-api-sandbox/src/lib.rs
Normal file
@@ -0,0 +1,703 @@
|
||||
/*
|
||||
* Author: Windmill Labs, Inc
|
||||
* Copyright: Windmill Labs, Inc 2024
|
||||
* This file and its contents are licensed under the AGPLv3 License.
|
||||
* Please see the included NOTICE for copyright information and
|
||||
* LICENSE-AGPL for a copy of the license.
|
||||
*/
|
||||
|
||||
use axum::{
|
||||
extract::{Path, Query},
|
||||
routing::{delete, get, post},
|
||||
Extension, Json, Router,
|
||||
};
|
||||
use chrono::Utc;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
use windmill_common::{
|
||||
db::UserDB,
|
||||
error::{Error, JsonResult},
|
||||
};
|
||||
use windmill_sandbox::{ExecRequest, ExecResult, SandboxConfig, SandboxInfo, SandboxStatus};
|
||||
|
||||
use windmill_api_auth::ApiAuthed;
|
||||
|
||||
pub fn workspaced_service() -> Router {
|
||||
Router::new()
|
||||
.route("/create", post(create_sandbox))
|
||||
.route("/list", get(list_sandboxes))
|
||||
.route("/:sandbox_id", get(get_sandbox))
|
||||
.route("/:sandbox_id", delete(delete_sandbox))
|
||||
.route("/:sandbox_id/exec", post(exec_sandbox))
|
||||
.route("/:sandbox_id/suspend", post(suspend_sandbox))
|
||||
.route("/:sandbox_id/resume", post(resume_sandbox))
|
||||
.route("/:sandbox_id/terminate", post(terminate_sandbox))
|
||||
.route("/:sandbox_id/write_file", post(write_file))
|
||||
.route("/:sandbox_id/read_file", get(read_file))
|
||||
.route("/:sandbox_id/execs", get(list_execs))
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct SandboxRow {
|
||||
id: Uuid,
|
||||
workspace_id: String,
|
||||
status: SandboxStatus,
|
||||
image: Option<String>,
|
||||
labels: serde_json::Value,
|
||||
mode: String,
|
||||
created_by: String,
|
||||
created_at: chrono::DateTime<Utc>,
|
||||
started_at: Option<chrono::DateTime<Utc>>,
|
||||
last_activity_at: Option<chrono::DateTime<Utc>>,
|
||||
suspended_at: Option<chrono::DateTime<Utc>>,
|
||||
stopped_at: Option<chrono::DateTime<Utc>>,
|
||||
error_message: Option<String>,
|
||||
ephemeral: bool,
|
||||
cpu_limit: i32,
|
||||
memory_limit_mb: i32,
|
||||
network_enabled: bool,
|
||||
}
|
||||
|
||||
impl From<SandboxRow> for SandboxInfo {
|
||||
fn from(r: SandboxRow) -> Self {
|
||||
SandboxInfo {
|
||||
id: r.id,
|
||||
workspace_id: r.workspace_id,
|
||||
status: r.status,
|
||||
image: r.image,
|
||||
labels: r.labels,
|
||||
mode: r.mode,
|
||||
created_by: r.created_by,
|
||||
created_at: r.created_at,
|
||||
started_at: r.started_at,
|
||||
last_activity_at: r.last_activity_at,
|
||||
suspended_at: r.suspended_at,
|
||||
stopped_at: r.stopped_at,
|
||||
error_message: r.error_message,
|
||||
ephemeral: r.ephemeral,
|
||||
cpu_limit: r.cpu_limit,
|
||||
memory_limit_mb: r.memory_limit_mb,
|
||||
network_enabled: r.network_enabled,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
struct SandboxStatusRow {
|
||||
status: SandboxStatus,
|
||||
host_id: Option<String>,
|
||||
mode: String,
|
||||
}
|
||||
|
||||
async fn create_sandbox(
|
||||
authed: ApiAuthed,
|
||||
Path(w_id): Path<String>,
|
||||
Extension(user_db): Extension<UserDB>,
|
||||
Json(config): Json<SandboxConfig>,
|
||||
) -> JsonResult<SandboxInfo> {
|
||||
let mut tx = user_db.begin(&authed).await?;
|
||||
|
||||
let mode = config.mode.to_string();
|
||||
let ephemeral = config.ephemeral || config.mode == windmill_sandbox::SandboxMode::Embedded;
|
||||
let expires_at = config
|
||||
.timeout_secs
|
||||
.map(|t| Utc::now() + chrono::Duration::seconds(t as i64));
|
||||
|
||||
let row = sqlx::query_as::<_, (Uuid, chrono::DateTime<Utc>)>(
|
||||
"INSERT INTO sandbox (
|
||||
workspace_id, created_by, image, timeout_secs, idle_timeout_secs,
|
||||
cpu_limit, memory_limit_mb, disk_limit_mb, env_vars, labels,
|
||||
mounts, network_enabled, mode, status, ephemeral,
|
||||
auto_stop_after_secs, expires_at, started_at, last_activity_at
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10,
|
||||
$11, $12, $13, 'creating', $14, $15, $16, now(), now()
|
||||
) RETURNING id, created_at",
|
||||
)
|
||||
.bind(&w_id)
|
||||
.bind(&authed.username)
|
||||
.bind(config.image.as_deref())
|
||||
.bind(config.timeout_secs)
|
||||
.bind(config.idle_timeout_secs)
|
||||
.bind(config.cpu_limit)
|
||||
.bind(config.memory_limit_mb)
|
||||
.bind(config.disk_limit_mb)
|
||||
.bind(&config.env_vars)
|
||||
.bind(&config.labels)
|
||||
.bind(&config.mounts)
|
||||
.bind(config.network_enabled)
|
||||
.bind(&mode)
|
||||
.bind(ephemeral)
|
||||
.bind(config.auto_stop_after_secs)
|
||||
.bind(expires_at)
|
||||
.fetch_one(&mut *tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
Ok(Json(SandboxInfo {
|
||||
id: row.0,
|
||||
workspace_id: w_id,
|
||||
status: SandboxStatus::Creating,
|
||||
image: config.image,
|
||||
labels: config.labels,
|
||||
mode,
|
||||
created_by: authed.username,
|
||||
created_at: row.1,
|
||||
started_at: None,
|
||||
last_activity_at: None,
|
||||
suspended_at: None,
|
||||
stopped_at: None,
|
||||
error_message: None,
|
||||
ephemeral,
|
||||
cpu_limit: config.cpu_limit,
|
||||
memory_limit_mb: config.memory_limit_mb,
|
||||
network_enabled: config.network_enabled,
|
||||
}))
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct ListSandboxesQuery {
|
||||
status: Option<String>,
|
||||
label_key: Option<String>,
|
||||
label_value: Option<String>,
|
||||
}
|
||||
|
||||
const SANDBOX_SELECT_COLS: &str = "id, workspace_id, status, image, labels, mode, created_by, created_at, started_at, last_activity_at, suspended_at, stopped_at, error_message, ephemeral, cpu_limit, memory_limit_mb, network_enabled";
|
||||
|
||||
async fn list_sandboxes(
|
||||
authed: ApiAuthed,
|
||||
Path(w_id): Path<String>,
|
||||
Extension(user_db): Extension<UserDB>,
|
||||
Query(query): Query<ListSandboxesQuery>,
|
||||
) -> JsonResult<Vec<SandboxInfo>> {
|
||||
let mut tx = user_db.begin(&authed).await?;
|
||||
|
||||
let statuses: Option<Vec<String>> = query
|
||||
.status
|
||||
.map(|s| s.split(',').map(|s| s.trim().to_string()).collect());
|
||||
|
||||
let rows = sqlx::query_as::<_, SandboxRow>(&format!(
|
||||
"SELECT {SANDBOX_SELECT_COLS}
|
||||
FROM sandbox
|
||||
WHERE workspace_id = $1
|
||||
AND ($2::text[] IS NULL OR status::text = ANY($2))
|
||||
AND ($3::text IS NULL OR $4::text IS NULL OR labels->>$3 = $4)
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 100"
|
||||
))
|
||||
.bind(&w_id)
|
||||
.bind(statuses.as_deref())
|
||||
.bind(query.label_key.as_deref())
|
||||
.bind(query.label_value.as_deref())
|
||||
.fetch_all(&mut *tx)
|
||||
.await?;
|
||||
|
||||
let sandboxes = rows.into_iter().map(SandboxInfo::from).collect();
|
||||
Ok(Json(sandboxes))
|
||||
}
|
||||
|
||||
async fn get_sandbox(
|
||||
authed: ApiAuthed,
|
||||
Path((w_id, sandbox_id)): Path<(String, Uuid)>,
|
||||
Extension(user_db): Extension<UserDB>,
|
||||
) -> JsonResult<SandboxInfo> {
|
||||
let mut tx = user_db.begin(&authed).await?;
|
||||
|
||||
let r = sqlx::query_as::<_, SandboxRow>(&format!(
|
||||
"SELECT {SANDBOX_SELECT_COLS} FROM sandbox WHERE id = $1 AND workspace_id = $2"
|
||||
))
|
||||
.bind(sandbox_id)
|
||||
.bind(&w_id)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?
|
||||
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
|
||||
|
||||
Ok(Json(r.into()))
|
||||
}
|
||||
|
||||
async fn delete_sandbox(
|
||||
authed: ApiAuthed,
|
||||
Path((w_id, sandbox_id)): Path<(String, Uuid)>,
|
||||
Extension(user_db): Extension<UserDB>,
|
||||
) -> JsonResult<String> {
|
||||
let mut tx = user_db.begin(&authed).await?;
|
||||
|
||||
let status = sqlx::query_scalar::<_, SandboxStatus>(
|
||||
"SELECT status FROM sandbox WHERE id = $1 AND workspace_id = $2",
|
||||
)
|
||||
.bind(sandbox_id)
|
||||
.bind(&w_id)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?
|
||||
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
|
||||
|
||||
if status != SandboxStatus::Stopped && status != SandboxStatus::Error {
|
||||
return Err(Error::BadRequest(
|
||||
"Sandbox must be stopped or in error state before deletion".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
sqlx::query("DELETE FROM sandbox WHERE id = $1")
|
||||
.bind(sandbox_id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
Ok(Json(format!("Sandbox {sandbox_id} deleted")))
|
||||
}
|
||||
|
||||
async fn exec_sandbox(
|
||||
authed: ApiAuthed,
|
||||
Path((w_id, sandbox_id)): Path<(String, Uuid)>,
|
||||
Extension(user_db): Extension<UserDB>,
|
||||
Json(request): Json<ExecRequest>,
|
||||
) -> JsonResult<ExecResult> {
|
||||
let mut tx = user_db.begin(&authed).await?;
|
||||
|
||||
let sandbox = sqlx::query_as::<_, SandboxStatusRow>(
|
||||
"SELECT status, host_id, mode FROM sandbox WHERE id = $1 AND workspace_id = $2",
|
||||
)
|
||||
.bind(sandbox_id)
|
||||
.bind(&w_id)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?
|
||||
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
|
||||
|
||||
if sandbox.status != SandboxStatus::Running {
|
||||
return Err(Error::BadRequest(format!(
|
||||
"Sandbox is not running (status: {})",
|
||||
sandbox.status
|
||||
)));
|
||||
}
|
||||
|
||||
let host_base_url = if sandbox.mode == "remote" {
|
||||
let host_id = sandbox.host_id.as_ref().ok_or_else(|| {
|
||||
Error::InternalErr("Remote sandbox has no host_id".to_string())
|
||||
})?;
|
||||
let host = sqlx::query_scalar::<_, String>(
|
||||
"SELECT base_url FROM sandbox_host WHERE id = $1",
|
||||
)
|
||||
.bind(host_id)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?
|
||||
.ok_or_else(|| Error::NotFound(format!("Sandbox host {host_id} not found")))?;
|
||||
Some(host)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let started_at = Utc::now();
|
||||
|
||||
let result = if let Some(base_url) = host_base_url {
|
||||
proxy_exec(&base_url, sandbox_id, &request).await?
|
||||
} else {
|
||||
return Err(Error::BadRequest(
|
||||
"Embedded sandbox exec must be performed directly via WM_SANDBOX_URL".to_string(),
|
||||
));
|
||||
};
|
||||
|
||||
let completed_at = Utc::now();
|
||||
let duration_ms = (completed_at - started_at).num_milliseconds();
|
||||
|
||||
sqlx::query(
|
||||
"INSERT INTO sandbox_exec (
|
||||
sandbox_id, workspace_id, command, cwd, env_vars,
|
||||
started_at, completed_at, exit_code, stdout, stderr,
|
||||
duration_ms, created_by
|
||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
|
||||
)
|
||||
.bind(sandbox_id)
|
||||
.bind(&w_id)
|
||||
.bind(&request.command)
|
||||
.bind(request.cwd.as_deref())
|
||||
.bind(&request.env)
|
||||
.bind(started_at)
|
||||
.bind(completed_at)
|
||||
.bind(result.exit_code)
|
||||
.bind(&result.stdout)
|
||||
.bind(&result.stderr)
|
||||
.bind(duration_ms)
|
||||
.bind(&authed.username)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query("UPDATE sandbox SET last_activity_at = now() WHERE id = $1")
|
||||
.bind(sandbox_id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
Ok(Json(result))
|
||||
}
|
||||
|
||||
async fn proxy_exec(
|
||||
base_url: &str,
|
||||
sandbox_id: Uuid,
|
||||
request: &ExecRequest,
|
||||
) -> windmill_common::error::Result<ExecResult> {
|
||||
let client = reqwest::Client::new();
|
||||
let url = format!("{}/sandbox/{}/exec", base_url, sandbox_id);
|
||||
let resp = client
|
||||
.post(&url)
|
||||
.json(request)
|
||||
.timeout(std::time::Duration::from_secs(
|
||||
request.timeout_secs.unwrap_or(300) as u64 + 5,
|
||||
))
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| Error::InternalErr(format!("Failed to proxy exec to sandbox host: {e}")))?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
let body = resp.text().await.unwrap_or_default();
|
||||
return Err(Error::InternalErr(format!(
|
||||
"Sandbox host returned error: {body}"
|
||||
)));
|
||||
}
|
||||
|
||||
resp.json::<ExecResult>()
|
||||
.await
|
||||
.map_err(|e| Error::InternalErr(format!("Failed to parse exec result: {e}")))
|
||||
}
|
||||
|
||||
async fn proxy_action(
|
||||
base_url: &str,
|
||||
sandbox_id: Uuid,
|
||||
action: &str,
|
||||
) -> windmill_common::error::Result<SandboxStatus> {
|
||||
let client = reqwest::Client::new();
|
||||
let url = format!("{}/sandbox/{}/{}", base_url, sandbox_id, action);
|
||||
let resp = client
|
||||
.post(&url)
|
||||
.timeout(std::time::Duration::from_secs(30))
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
Error::InternalErr(format!("Failed to proxy {action} to sandbox host: {e}"))
|
||||
})?;
|
||||
|
||||
if !resp.status().is_success() {
|
||||
let body = resp.text().await.unwrap_or_default();
|
||||
return Err(Error::InternalErr(format!(
|
||||
"Sandbox host returned error: {body}"
|
||||
)));
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct StatusResponse {
|
||||
status: SandboxStatus,
|
||||
}
|
||||
|
||||
let status_resp = resp.json::<StatusResponse>().await.map_err(|e| {
|
||||
Error::InternalErr(format!("Failed to parse action response: {e}"))
|
||||
})?;
|
||||
|
||||
Ok(status_resp.status)
|
||||
}
|
||||
|
||||
async fn get_host_base_url(
|
||||
tx: &mut sqlx::PgConnection,
|
||||
host_id: &str,
|
||||
) -> windmill_common::error::Result<String> {
|
||||
sqlx::query_scalar::<_, String>("SELECT base_url FROM sandbox_host WHERE id = $1")
|
||||
.bind(host_id)
|
||||
.fetch_one(&mut *tx)
|
||||
.await
|
||||
.map_err(|_| Error::NotFound(format!("Sandbox host {host_id} not found")))
|
||||
}
|
||||
|
||||
async fn suspend_sandbox(
|
||||
authed: ApiAuthed,
|
||||
Path((w_id, sandbox_id)): Path<(String, Uuid)>,
|
||||
Extension(user_db): Extension<UserDB>,
|
||||
) -> JsonResult<SandboxInfo> {
|
||||
let mut tx = user_db.clone().begin(&authed).await?;
|
||||
|
||||
let sandbox = sqlx::query_as::<_, SandboxStatusRow>(
|
||||
"SELECT status, host_id, mode FROM sandbox WHERE id = $1 AND workspace_id = $2",
|
||||
)
|
||||
.bind(sandbox_id)
|
||||
.bind(&w_id)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?
|
||||
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
|
||||
|
||||
if sandbox.status != SandboxStatus::Running {
|
||||
return Err(Error::BadRequest(format!(
|
||||
"Can only suspend a running sandbox (status: {})",
|
||||
sandbox.status
|
||||
)));
|
||||
}
|
||||
|
||||
if sandbox.mode == "remote" {
|
||||
let host_id = sandbox.host_id.as_ref().ok_or_else(|| {
|
||||
Error::InternalErr("Remote sandbox has no host_id".to_string())
|
||||
})?;
|
||||
let base_url = get_host_base_url(&mut *tx, host_id).await?;
|
||||
proxy_action(&base_url, sandbox_id, "suspend").await?;
|
||||
}
|
||||
|
||||
sqlx::query(
|
||||
"UPDATE sandbox SET status = 'suspended', suspended_at = now() WHERE id = $1",
|
||||
)
|
||||
.bind(sandbox_id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
get_sandbox_info(user_db, &authed, &w_id, sandbox_id).await
|
||||
}
|
||||
|
||||
async fn resume_sandbox(
|
||||
authed: ApiAuthed,
|
||||
Path((w_id, sandbox_id)): Path<(String, Uuid)>,
|
||||
Extension(user_db): Extension<UserDB>,
|
||||
) -> JsonResult<SandboxInfo> {
|
||||
let mut tx = user_db.clone().begin(&authed).await?;
|
||||
|
||||
let sandbox = sqlx::query_as::<_, SandboxStatusRow>(
|
||||
"SELECT status, host_id, mode FROM sandbox WHERE id = $1 AND workspace_id = $2",
|
||||
)
|
||||
.bind(sandbox_id)
|
||||
.bind(&w_id)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?
|
||||
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
|
||||
|
||||
if sandbox.status != SandboxStatus::Suspended {
|
||||
return Err(Error::BadRequest(format!(
|
||||
"Can only resume a suspended sandbox (status: {})",
|
||||
sandbox.status
|
||||
)));
|
||||
}
|
||||
|
||||
if sandbox.mode == "remote" {
|
||||
let host_id = sandbox.host_id.as_ref().ok_or_else(|| {
|
||||
Error::InternalErr("Remote sandbox has no host_id".to_string())
|
||||
})?;
|
||||
let base_url = get_host_base_url(&mut *tx, host_id).await?;
|
||||
proxy_action(&base_url, sandbox_id, "resume").await?;
|
||||
}
|
||||
|
||||
sqlx::query(
|
||||
"UPDATE sandbox SET status = 'running', suspended_at = NULL, last_activity_at = now() WHERE id = $1",
|
||||
)
|
||||
.bind(sandbox_id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
get_sandbox_info(user_db, &authed, &w_id, sandbox_id).await
|
||||
}
|
||||
|
||||
async fn terminate_sandbox(
|
||||
authed: ApiAuthed,
|
||||
Path((w_id, sandbox_id)): Path<(String, Uuid)>,
|
||||
Extension(user_db): Extension<UserDB>,
|
||||
) -> JsonResult<SandboxInfo> {
|
||||
let mut tx = user_db.clone().begin(&authed).await?;
|
||||
|
||||
let sandbox = sqlx::query_as::<_, SandboxStatusRow>(
|
||||
"SELECT status, host_id, mode FROM sandbox WHERE id = $1 AND workspace_id = $2",
|
||||
)
|
||||
.bind(sandbox_id)
|
||||
.bind(&w_id)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?
|
||||
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
|
||||
|
||||
if sandbox.status == SandboxStatus::Stopped {
|
||||
return get_sandbox_info(user_db, &authed, &w_id, sandbox_id).await;
|
||||
}
|
||||
|
||||
if sandbox.mode == "remote" {
|
||||
if let Some(host_id) = sandbox.host_id.as_ref() {
|
||||
if let Ok(base_url) = get_host_base_url(&mut *tx, host_id).await {
|
||||
let _ = proxy_action(&base_url, sandbox_id, "terminate").await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sqlx::query(
|
||||
"UPDATE sandbox SET status = 'stopped', stopped_at = now() WHERE id = $1",
|
||||
)
|
||||
.bind(sandbox_id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
get_sandbox_info(user_db, &authed, &w_id, sandbox_id).await
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize)]
|
||||
struct WriteFileBody {
|
||||
path: String,
|
||||
content: String,
|
||||
}
|
||||
|
||||
async fn write_file(
|
||||
authed: ApiAuthed,
|
||||
Path((w_id, sandbox_id)): Path<(String, Uuid)>,
|
||||
Extension(user_db): Extension<UserDB>,
|
||||
Json(body): Json<WriteFileBody>,
|
||||
) -> JsonResult<String> {
|
||||
let mut tx = user_db.begin(&authed).await?;
|
||||
|
||||
let sandbox = sqlx::query_as::<_, SandboxStatusRow>(
|
||||
"SELECT status, host_id, mode FROM sandbox WHERE id = $1 AND workspace_id = $2",
|
||||
)
|
||||
.bind(sandbox_id)
|
||||
.bind(&w_id)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?
|
||||
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
|
||||
|
||||
if sandbox.status == SandboxStatus::Stopped {
|
||||
return Err(Error::BadRequest("Sandbox is stopped".to_string()));
|
||||
}
|
||||
|
||||
if sandbox.mode == "remote" {
|
||||
let host_id = sandbox.host_id.as_ref().ok_or_else(|| {
|
||||
Error::InternalErr("Remote sandbox has no host_id".to_string())
|
||||
})?;
|
||||
let base_url = get_host_base_url(&mut *tx, host_id).await?;
|
||||
let client = reqwest::Client::new();
|
||||
let url = format!("{}/sandbox/{}/write_file", base_url, sandbox_id);
|
||||
let resp = client
|
||||
.post(&url)
|
||||
.json(&body)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| Error::InternalErr(format!("Failed to proxy write_file: {e}")))?;
|
||||
if !resp.status().is_success() {
|
||||
let err = resp.text().await.unwrap_or_default();
|
||||
return Err(Error::InternalErr(format!("Write file failed: {err}")));
|
||||
}
|
||||
}
|
||||
|
||||
sqlx::query("UPDATE sandbox SET last_activity_at = now() WHERE id = $1")
|
||||
.bind(sandbox_id)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
Ok(Json("ok".to_string()))
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct ReadFileQuery {
|
||||
path: String,
|
||||
}
|
||||
|
||||
async fn read_file(
|
||||
authed: ApiAuthed,
|
||||
Path((w_id, sandbox_id)): Path<(String, Uuid)>,
|
||||
Extension(user_db): Extension<UserDB>,
|
||||
Query(query): Query<ReadFileQuery>,
|
||||
) -> JsonResult<String> {
|
||||
let mut tx = user_db.begin(&authed).await?;
|
||||
|
||||
let sandbox = sqlx::query_as::<_, SandboxStatusRow>(
|
||||
"SELECT status, host_id, mode FROM sandbox WHERE id = $1 AND workspace_id = $2",
|
||||
)
|
||||
.bind(sandbox_id)
|
||||
.bind(&w_id)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?
|
||||
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
|
||||
|
||||
if sandbox.status == SandboxStatus::Stopped {
|
||||
return Err(Error::BadRequest("Sandbox is stopped".to_string()));
|
||||
}
|
||||
|
||||
if sandbox.mode == "remote" {
|
||||
let host_id = sandbox.host_id.as_ref().ok_or_else(|| {
|
||||
Error::InternalErr("Remote sandbox has no host_id".to_string())
|
||||
})?;
|
||||
let base_url = get_host_base_url(&mut *tx, host_id).await?;
|
||||
let client = reqwest::Client::new();
|
||||
let url = format!(
|
||||
"{}/sandbox/{}/read_file?path={}",
|
||||
base_url, sandbox_id, query.path
|
||||
);
|
||||
let resp = client
|
||||
.get(&url)
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| Error::InternalErr(format!("Failed to proxy read_file: {e}")))?;
|
||||
if !resp.status().is_success() {
|
||||
let err = resp.text().await.unwrap_or_default();
|
||||
return Err(Error::InternalErr(format!("Read file failed: {err}")));
|
||||
}
|
||||
let content = resp.text().await.map_err(|e| {
|
||||
Error::InternalErr(format!("Failed to read response body: {e}"))
|
||||
})?;
|
||||
return Ok(Json(content));
|
||||
}
|
||||
|
||||
Err(Error::BadRequest(
|
||||
"Embedded sandbox file ops must be performed directly via WM_SANDBOX_URL".to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
#[derive(Serialize, sqlx::FromRow)]
|
||||
struct ExecRecord {
|
||||
id: Uuid,
|
||||
sandbox_id: Uuid,
|
||||
command: String,
|
||||
started_at: chrono::DateTime<Utc>,
|
||||
completed_at: Option<chrono::DateTime<Utc>>,
|
||||
exit_code: Option<i32>,
|
||||
stdout: Option<String>,
|
||||
stderr: Option<String>,
|
||||
duration_ms: Option<i64>,
|
||||
created_by: String,
|
||||
}
|
||||
|
||||
async fn list_execs(
|
||||
authed: ApiAuthed,
|
||||
Path((w_id, sandbox_id)): Path<(String, Uuid)>,
|
||||
Extension(user_db): Extension<UserDB>,
|
||||
) -> JsonResult<Vec<ExecRecord>> {
|
||||
let mut tx = user_db.begin(&authed).await?;
|
||||
|
||||
let rows = sqlx::query_as::<_, ExecRecord>(
|
||||
"SELECT id, sandbox_id, command, started_at, completed_at,
|
||||
exit_code, stdout, stderr, duration_ms, created_by
|
||||
FROM sandbox_exec
|
||||
WHERE sandbox_id = $1 AND workspace_id = $2
|
||||
ORDER BY started_at DESC
|
||||
LIMIT 100",
|
||||
)
|
||||
.bind(sandbox_id)
|
||||
.bind(&w_id)
|
||||
.fetch_all(&mut *tx)
|
||||
.await?;
|
||||
|
||||
Ok(Json(rows))
|
||||
}
|
||||
|
||||
async fn get_sandbox_info(
|
||||
user_db: UserDB,
|
||||
authed: &ApiAuthed,
|
||||
w_id: &str,
|
||||
sandbox_id: Uuid,
|
||||
) -> JsonResult<SandboxInfo> {
|
||||
let mut tx = user_db.begin(authed).await?;
|
||||
|
||||
let r = sqlx::query_as::<_, SandboxRow>(&format!(
|
||||
"SELECT {SANDBOX_SELECT_COLS} FROM sandbox WHERE id = $1 AND workspace_id = $2"
|
||||
))
|
||||
.bind(sandbox_id)
|
||||
.bind(w_id)
|
||||
.fetch_one(&mut *tx)
|
||||
.await?;
|
||||
|
||||
Ok(Json(r.into()))
|
||||
}
|
||||
@@ -153,6 +153,7 @@ windmill-api-flow-conversations.workspace = true
|
||||
windmill-api-inputs.workspace = true
|
||||
windmill-api-npm-proxy.workspace = true
|
||||
windmill-api-openapi = { workspace = true, optional = true }
|
||||
windmill-api-sandbox.workspace = true
|
||||
windmill-api-schedule.workspace = true
|
||||
windmill-api-settings = { workspace = true }
|
||||
windmill-api-workers.workspace = true
|
||||
|
||||
@@ -509,6 +509,7 @@ pub async fn run_server(
|
||||
.nest("/npm_proxy", windmill_api_npm_proxy::workspaced_service())
|
||||
.nest("/raw_apps", raw_apps::workspaced_service())
|
||||
.nest("/resources", resources::workspaced_service())
|
||||
.nest("/sandboxes", windmill_api_sandbox::workspaced_service())
|
||||
.nest("/schedules", windmill_api_schedule::workspaced_service())
|
||||
.nest("/scripts", scripts::workspaced_service())
|
||||
.nest(
|
||||
|
||||
21
backend/windmill-sandbox/Cargo.toml
Normal file
21
backend/windmill-sandbox/Cargo.toml
Normal file
@@ -0,0 +1,21 @@
|
||||
[package]
|
||||
name = "windmill-sandbox"
|
||||
version.workspace = true
|
||||
authors.workspace = true
|
||||
edition.workspace = true
|
||||
|
||||
[lib]
|
||||
name = "windmill_sandbox"
|
||||
path = "src/lib.rs"
|
||||
|
||||
[dependencies]
|
||||
axum.workspace = true
|
||||
chrono.workspace = true
|
||||
nix.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
sqlx.workspace = true
|
||||
tokio.workspace = true
|
||||
tracing.workspace = true
|
||||
uuid.workspace = true
|
||||
windmill-common = { workspace = true, default-features = false }
|
||||
14
backend/windmill-sandbox/src/lib.rs
Normal file
14
backend/windmill-sandbox/src/lib.rs
Normal file
@@ -0,0 +1,14 @@
|
||||
/*
|
||||
* Author: Windmill Labs, Inc
|
||||
* Copyright: Windmill Labs, Inc 2024
|
||||
* This file and its contents are licensed under the AGPLv3 License.
|
||||
* Please see the included NOTICE for copyright information and
|
||||
* LICENSE-AGPL for a copy of the license.
|
||||
*/
|
||||
|
||||
pub mod manager;
|
||||
pub mod nsjail;
|
||||
pub mod types;
|
||||
|
||||
pub use manager::SandboxManager;
|
||||
pub use types::*;
|
||||
302
backend/windmill-sandbox/src/manager.rs
Normal file
302
backend/windmill-sandbox/src/manager.rs
Normal file
@@ -0,0 +1,302 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use chrono::Utc;
|
||||
use tokio::sync::Mutex;
|
||||
use uuid::Uuid;
|
||||
use windmill_common::error::{Error, Result};
|
||||
|
||||
use crate::nsjail;
|
||||
use crate::types::*;
|
||||
|
||||
struct SandboxProcess {
|
||||
_id: Uuid,
|
||||
status: SandboxStatus,
|
||||
nsjail_child: Option<tokio::process::Child>,
|
||||
nsjail_pid: Option<u32>,
|
||||
sandbox_dir: String,
|
||||
_config: SandboxConfig,
|
||||
_started_at: Option<chrono::DateTime<Utc>>,
|
||||
last_activity_at: Option<chrono::DateTime<Utc>>,
|
||||
suspended_at: Option<chrono::DateTime<Utc>>,
|
||||
_config_path: String,
|
||||
}
|
||||
|
||||
pub struct SandboxManager {
|
||||
sandboxes: Arc<Mutex<HashMap<Uuid, SandboxProcess>>>,
|
||||
base_dir: String,
|
||||
}
|
||||
|
||||
impl SandboxManager {
|
||||
pub fn new(base_dir: String) -> Self {
|
||||
std::fs::create_dir_all(&base_dir).ok();
|
||||
Self {
|
||||
sandboxes: Arc::new(Mutex::new(HashMap::new())),
|
||||
base_dir,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_sandbox(&self, config: SandboxConfig) -> Result<(Uuid, SandboxStatus)> {
|
||||
let id = Uuid::new_v4();
|
||||
let sandbox_dir = format!("{}/{}", self.base_dir, id);
|
||||
std::fs::create_dir_all(&sandbox_dir)
|
||||
.map_err(|e| Error::InternalErr(format!("Failed to create sandbox dir: {e}")))?;
|
||||
|
||||
let nsjail_config = nsjail::NsjailConfig {
|
||||
sandbox_dir: sandbox_dir.clone(),
|
||||
clone_newnet: config.network_enabled,
|
||||
clone_newuser: !is_nuser_disabled(),
|
||||
memory_limit_bytes: (config.memory_limit_mb as u64) * 1024 * 1024,
|
||||
cpu_ms_per_sec: (config.cpu_limit as u32) * 1000,
|
||||
disk_limit_bytes: (config.disk_limit_mb as u64) * 1024 * 1024,
|
||||
extra_mounts: vec![],
|
||||
};
|
||||
|
||||
let config_content = nsjail::render_config(&nsjail_config);
|
||||
let config_path = format!("{}/sandbox.config.proto", sandbox_dir);
|
||||
std::fs::write(&config_path, &config_content)
|
||||
.map_err(|e| Error::InternalErr(format!("Failed to write nsjail config: {e}")))?;
|
||||
|
||||
let env_vars: Vec<(String, String)> = if let Some(obj) = config.env_vars.as_object() {
|
||||
obj.iter()
|
||||
.map(|(k, v)| (k.clone(), v.as_str().unwrap_or_default().to_string()))
|
||||
.collect()
|
||||
} else {
|
||||
vec![]
|
||||
};
|
||||
|
||||
let child = nsjail::spawn_nsjail(&config_path, &env_vars).await?;
|
||||
let nsjail_pid = child.id();
|
||||
|
||||
let process = SandboxProcess {
|
||||
_id: id,
|
||||
status: SandboxStatus::Running,
|
||||
nsjail_child: Some(child),
|
||||
nsjail_pid,
|
||||
sandbox_dir,
|
||||
_config: config,
|
||||
_started_at: Some(Utc::now()),
|
||||
last_activity_at: Some(Utc::now()),
|
||||
suspended_at: None,
|
||||
_config_path: config_path,
|
||||
};
|
||||
|
||||
self.sandboxes.lock().await.insert(id, process);
|
||||
Ok((id, SandboxStatus::Running))
|
||||
}
|
||||
|
||||
pub async fn exec(
|
||||
&self,
|
||||
sandbox_id: Uuid,
|
||||
request: ExecRequest,
|
||||
) -> Result<ExecResult> {
|
||||
let (child_pid, _sandbox_dir) = {
|
||||
let mut sandboxes = self.sandboxes.lock().await;
|
||||
let sb = sandboxes
|
||||
.get_mut(&sandbox_id)
|
||||
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
|
||||
|
||||
if sb.status != SandboxStatus::Running {
|
||||
return Err(Error::BadRequest(format!(
|
||||
"Sandbox is not running (status: {})",
|
||||
sb.status
|
||||
)));
|
||||
}
|
||||
sb.last_activity_at = Some(Utc::now());
|
||||
let pid = sb
|
||||
.nsjail_pid
|
||||
.ok_or_else(|| Error::InternalErr("No PID for sandbox".to_string()))?;
|
||||
(pid, sb.sandbox_dir.clone())
|
||||
};
|
||||
|
||||
let env_vars: Vec<(String, String)> = request
|
||||
.env
|
||||
.as_ref()
|
||||
.and_then(|v| v.as_object())
|
||||
.map(|obj| {
|
||||
obj.iter()
|
||||
.map(|(k, v)| (k.clone(), v.as_str().unwrap_or_default().to_string()))
|
||||
.collect()
|
||||
})
|
||||
.unwrap_or_default();
|
||||
|
||||
let started_at = Utc::now();
|
||||
let (exit_code, stdout, stderr) = nsjail::exec_in_sandbox(
|
||||
child_pid,
|
||||
&request.command,
|
||||
request.cwd.as_deref(),
|
||||
&env_vars,
|
||||
request.timeout_secs.map(|s| s as u32),
|
||||
)
|
||||
.await?;
|
||||
let duration_ms = (Utc::now() - started_at).num_milliseconds();
|
||||
|
||||
Ok(ExecResult {
|
||||
exec_id: Uuid::new_v4(),
|
||||
exit_code,
|
||||
stdout,
|
||||
stderr,
|
||||
duration_ms,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn suspend(&self, sandbox_id: Uuid) -> Result<SandboxStatus> {
|
||||
let mut sandboxes = self.sandboxes.lock().await;
|
||||
let sb = sandboxes
|
||||
.get_mut(&sandbox_id)
|
||||
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
|
||||
|
||||
if sb.status != SandboxStatus::Running {
|
||||
return Err(Error::BadRequest(format!(
|
||||
"Can only suspend a running sandbox (status: {})",
|
||||
sb.status
|
||||
)));
|
||||
}
|
||||
|
||||
let pid = sb
|
||||
.nsjail_pid
|
||||
.ok_or_else(|| Error::InternalErr("No PID for sandbox".to_string()))?;
|
||||
|
||||
nsjail::send_signal(pid, nix::sys::signal::Signal::SIGSTOP)?;
|
||||
sb.status = SandboxStatus::Suspended;
|
||||
sb.suspended_at = Some(Utc::now());
|
||||
Ok(SandboxStatus::Suspended)
|
||||
}
|
||||
|
||||
pub async fn resume(&self, sandbox_id: Uuid) -> Result<SandboxStatus> {
|
||||
let mut sandboxes = self.sandboxes.lock().await;
|
||||
let sb = sandboxes
|
||||
.get_mut(&sandbox_id)
|
||||
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
|
||||
|
||||
if sb.status != SandboxStatus::Suspended {
|
||||
return Err(Error::BadRequest(format!(
|
||||
"Can only resume a suspended sandbox (status: {})",
|
||||
sb.status
|
||||
)));
|
||||
}
|
||||
|
||||
let pid = sb
|
||||
.nsjail_pid
|
||||
.ok_or_else(|| Error::InternalErr("No PID for sandbox".to_string()))?;
|
||||
|
||||
nsjail::send_signal(pid, nix::sys::signal::Signal::SIGCONT)?;
|
||||
sb.status = SandboxStatus::Running;
|
||||
sb.suspended_at = None;
|
||||
sb.last_activity_at = Some(Utc::now());
|
||||
Ok(SandboxStatus::Running)
|
||||
}
|
||||
|
||||
pub async fn terminate(&self, sandbox_id: Uuid) -> Result<SandboxStatus> {
|
||||
let mut sandboxes = self.sandboxes.lock().await;
|
||||
let sb = sandboxes
|
||||
.get_mut(&sandbox_id)
|
||||
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
|
||||
|
||||
if sb.status == SandboxStatus::Stopped {
|
||||
return Ok(SandboxStatus::Stopped);
|
||||
}
|
||||
|
||||
if let Some(pid) = sb.nsjail_pid {
|
||||
let _ = nsjail::send_signal(pid, nix::sys::signal::Signal::SIGTERM);
|
||||
}
|
||||
|
||||
if let Some(ref mut child) = sb.nsjail_child {
|
||||
let _ = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(5),
|
||||
child.wait(),
|
||||
)
|
||||
.await;
|
||||
child.kill().await.ok();
|
||||
}
|
||||
|
||||
sb.status = SandboxStatus::Stopped;
|
||||
sb.nsjail_child = None;
|
||||
|
||||
let _ = std::fs::remove_dir_all(&sb.sandbox_dir);
|
||||
|
||||
Ok(SandboxStatus::Stopped)
|
||||
}
|
||||
|
||||
pub async fn status(&self, sandbox_id: Uuid) -> Result<SandboxStatus> {
|
||||
let sandboxes = self.sandboxes.lock().await;
|
||||
let sb = sandboxes
|
||||
.get(&sandbox_id)
|
||||
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
|
||||
Ok(sb.status)
|
||||
}
|
||||
|
||||
pub async fn write_file(
|
||||
&self,
|
||||
sandbox_id: Uuid,
|
||||
path: &str,
|
||||
content: &[u8],
|
||||
) -> Result<()> {
|
||||
let sandbox_dir = {
|
||||
let mut sandboxes = self.sandboxes.lock().await;
|
||||
let sb = sandboxes
|
||||
.get_mut(&sandbox_id)
|
||||
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
|
||||
if sb.status == SandboxStatus::Stopped {
|
||||
return Err(Error::BadRequest("Sandbox is stopped".to_string()));
|
||||
}
|
||||
sb.last_activity_at = Some(Utc::now());
|
||||
sb.sandbox_dir.clone()
|
||||
};
|
||||
nsjail::write_to_sandbox(&sandbox_dir, path, content)
|
||||
}
|
||||
|
||||
pub async fn read_file(&self, sandbox_id: Uuid, path: &str) -> Result<String> {
|
||||
let sandbox_dir = {
|
||||
let sandboxes = self.sandboxes.lock().await;
|
||||
let sb = sandboxes
|
||||
.get(&sandbox_id)
|
||||
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
|
||||
if sb.status == SandboxStatus::Stopped {
|
||||
return Err(Error::BadRequest("Sandbox is stopped".to_string()));
|
||||
}
|
||||
sb.sandbox_dir.clone()
|
||||
};
|
||||
nsjail::read_from_sandbox(&sandbox_dir, path)
|
||||
}
|
||||
|
||||
pub async fn list_sandboxes(&self) -> Vec<(Uuid, SandboxStatus)> {
|
||||
let sandboxes = self.sandboxes.lock().await;
|
||||
sandboxes
|
||||
.iter()
|
||||
.map(|(id, sb)| (*id, sb.status))
|
||||
.collect()
|
||||
}
|
||||
|
||||
pub async fn cleanup_all(&self) {
|
||||
let ids: Vec<Uuid> = {
|
||||
let sandboxes = self.sandboxes.lock().await;
|
||||
sandboxes.keys().cloned().collect()
|
||||
};
|
||||
for id in ids {
|
||||
let _ = self.terminate(id).await;
|
||||
}
|
||||
let mut sandboxes = self.sandboxes.lock().await;
|
||||
sandboxes.clear();
|
||||
}
|
||||
|
||||
pub async fn remove_stopped(&self, sandbox_id: Uuid) -> Result<()> {
|
||||
let mut sandboxes = self.sandboxes.lock().await;
|
||||
let sb = sandboxes
|
||||
.get(&sandbox_id)
|
||||
.ok_or_else(|| Error::NotFound(format!("Sandbox {sandbox_id} not found")))?;
|
||||
if sb.status != SandboxStatus::Stopped {
|
||||
return Err(Error::BadRequest(
|
||||
"Can only remove stopped sandboxes".to_string(),
|
||||
));
|
||||
}
|
||||
sandboxes.remove(&sandbox_id);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn is_nuser_disabled() -> bool {
|
||||
std::env::var("DISABLE_NUSER")
|
||||
.map(|v| v == "true" || v == "1")
|
||||
.unwrap_or(false)
|
||||
}
|
||||
155
backend/windmill-sandbox/src/nsjail.rs
Normal file
155
backend/windmill-sandbox/src/nsjail.rs
Normal file
@@ -0,0 +1,155 @@
|
||||
use std::path::Path;
|
||||
use std::process::Stdio;
|
||||
|
||||
use tokio::process::Command;
|
||||
use windmill_common::error::{Error, Result};
|
||||
|
||||
const NSJAIL_CONFIG_TEMPLATE: &str = include_str!("../../windmill-worker/nsjail/sandbox.config.proto");
|
||||
|
||||
pub struct NsjailConfig {
|
||||
pub sandbox_dir: String,
|
||||
pub clone_newnet: bool,
|
||||
pub clone_newuser: bool,
|
||||
pub memory_limit_bytes: u64,
|
||||
pub cpu_ms_per_sec: u32,
|
||||
pub disk_limit_bytes: u64,
|
||||
pub extra_mounts: Vec<MountConfig>,
|
||||
}
|
||||
|
||||
pub struct MountConfig {
|
||||
pub src: String,
|
||||
pub dst: String,
|
||||
pub rw: bool,
|
||||
}
|
||||
|
||||
fn nsjail_path() -> String {
|
||||
std::env::var("NSJAIL_PATH").unwrap_or_else(|_| "nsjail".to_string())
|
||||
}
|
||||
|
||||
pub fn render_config(config: &NsjailConfig) -> String {
|
||||
let extra_mounts = config
|
||||
.extra_mounts
|
||||
.iter()
|
||||
.map(|m| {
|
||||
format!(
|
||||
"mount {{\n src: \"{}\"\n dst: \"{}\"\n is_bind: true\n rw: {}\n}}",
|
||||
m.src, m.dst, m.rw
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.join("\n\n");
|
||||
|
||||
NSJAIL_CONFIG_TEMPLATE
|
||||
.replace("{SANDBOX_DIR}", &config.sandbox_dir)
|
||||
.replace("{CLONE_NEWNET}", &(!config.clone_newnet).to_string())
|
||||
.replace("{CLONE_NEWUSER}", &config.clone_newuser.to_string())
|
||||
.replace("{MEMORY_LIMIT}", &config.memory_limit_bytes.to_string())
|
||||
.replace("{CPU_MS_PER_SEC}", &config.cpu_ms_per_sec.to_string())
|
||||
.replace("{DISK_LIMIT}", &config.disk_limit_bytes.to_string())
|
||||
.replace("{IFACE_NO_LO}", &(!config.clone_newnet).to_string())
|
||||
.replace("{EXTRA_MOUNTS}", &extra_mounts)
|
||||
}
|
||||
|
||||
pub async fn spawn_nsjail(
|
||||
config_path: &str,
|
||||
env_vars: &[(String, String)],
|
||||
) -> Result<tokio::process::Child> {
|
||||
let mut cmd = Command::new(nsjail_path());
|
||||
cmd.arg("--config")
|
||||
.arg(config_path)
|
||||
.arg("--")
|
||||
.arg("/bin/bash")
|
||||
.arg("--login")
|
||||
.stdin(Stdio::null())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.kill_on_drop(true);
|
||||
|
||||
for (k, v) in env_vars {
|
||||
cmd.env(k, v);
|
||||
}
|
||||
|
||||
cmd.spawn()
|
||||
.map_err(|e| Error::InternalErr(format!("Failed to spawn nsjail: {e}")))
|
||||
}
|
||||
|
||||
pub async fn exec_in_sandbox(
|
||||
child_pid: u32,
|
||||
command: &str,
|
||||
cwd: Option<&str>,
|
||||
env_vars: &[(String, String)],
|
||||
timeout_secs: Option<u32>,
|
||||
) -> Result<(i32, String, String)> {
|
||||
let mut cmd = Command::new("nsenter");
|
||||
cmd.arg("--target")
|
||||
.arg(child_pid.to_string())
|
||||
.arg("--mount")
|
||||
.arg("--pid")
|
||||
.arg("--")
|
||||
.arg("/bin/bash")
|
||||
.arg("-c")
|
||||
.arg(command)
|
||||
.stdin(Stdio::null())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.kill_on_drop(true);
|
||||
|
||||
if let Some(cwd) = cwd {
|
||||
cmd.current_dir(cwd);
|
||||
}
|
||||
for (k, v) in env_vars {
|
||||
cmd.env(k, v);
|
||||
}
|
||||
|
||||
let timeout = timeout_secs.unwrap_or(300);
|
||||
|
||||
let child = cmd
|
||||
.spawn()
|
||||
.map_err(|e| Error::InternalErr(format!("Failed to exec nsenter: {e}")))?;
|
||||
|
||||
let result = tokio::time::timeout(
|
||||
std::time::Duration::from_secs(timeout as u64),
|
||||
child.wait_with_output(),
|
||||
)
|
||||
.await
|
||||
.map_err(|_| Error::ExecutionErr(format!("Command timed out after {timeout}s")))?
|
||||
.map_err(|e| Error::InternalErr(format!("Failed to wait for nsenter: {e}")))?;
|
||||
|
||||
let exit_code = result.status.code().unwrap_or(-1);
|
||||
let stdout = String::from_utf8_lossy(&result.stdout).to_string();
|
||||
let stderr = String::from_utf8_lossy(&result.stderr).to_string();
|
||||
|
||||
Ok((exit_code, stdout, stderr))
|
||||
}
|
||||
|
||||
pub fn send_signal(pid: u32, signal: nix::sys::signal::Signal) -> Result<()> {
|
||||
nix::sys::signal::kill(
|
||||
nix::unistd::Pid::from_raw(pid as i32),
|
||||
signal,
|
||||
)
|
||||
.map_err(|e| Error::InternalErr(format!("Failed to send signal {signal} to PID {pid}: {e}")))
|
||||
}
|
||||
|
||||
pub fn write_to_sandbox(sandbox_dir: &str, file_path: &str, content: &[u8]) -> Result<()> {
|
||||
let full_path = Path::new(sandbox_dir).join(
|
||||
file_path
|
||||
.strip_prefix('/')
|
||||
.unwrap_or(file_path),
|
||||
);
|
||||
if let Some(parent) = full_path.parent() {
|
||||
std::fs::create_dir_all(parent)
|
||||
.map_err(|e| Error::InternalErr(format!("Failed to create directory: {e}")))?;
|
||||
}
|
||||
std::fs::write(&full_path, content)
|
||||
.map_err(|e| Error::InternalErr(format!("Failed to write file: {e}")))
|
||||
}
|
||||
|
||||
pub fn read_from_sandbox(sandbox_dir: &str, file_path: &str) -> Result<String> {
|
||||
let full_path = Path::new(sandbox_dir).join(
|
||||
file_path
|
||||
.strip_prefix('/')
|
||||
.unwrap_or(file_path),
|
||||
);
|
||||
std::fs::read_to_string(&full_path)
|
||||
.map_err(|e| Error::NotFound(format!("File not found or unreadable: {e}")))
|
||||
}
|
||||
136
backend/windmill-sandbox/src/types.rs
Normal file
136
backend/windmill-sandbox/src/types.rs
Normal file
@@ -0,0 +1,136 @@
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, sqlx::Type)]
|
||||
#[sqlx(type_name = "sandbox_status", rename_all = "lowercase")]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum SandboxStatus {
|
||||
Creating,
|
||||
Running,
|
||||
Suspended,
|
||||
Stopped,
|
||||
Error,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for SandboxStatus {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
SandboxStatus::Creating => write!(f, "creating"),
|
||||
SandboxStatus::Running => write!(f, "running"),
|
||||
SandboxStatus::Suspended => write!(f, "suspended"),
|
||||
SandboxStatus::Stopped => write!(f, "stopped"),
|
||||
SandboxStatus::Error => write!(f, "error"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SandboxConfig {
|
||||
#[serde(default)]
|
||||
pub mode: SandboxMode,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub image: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub timeout_secs: Option<i32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub idle_timeout_secs: Option<i32>,
|
||||
#[serde(default = "default_cpu_limit")]
|
||||
pub cpu_limit: i32,
|
||||
#[serde(default = "default_memory_limit_mb")]
|
||||
pub memory_limit_mb: i32,
|
||||
#[serde(default = "default_disk_limit_mb")]
|
||||
pub disk_limit_mb: i32,
|
||||
#[serde(default)]
|
||||
pub env_vars: serde_json::Value,
|
||||
#[serde(default)]
|
||||
pub labels: serde_json::Value,
|
||||
#[serde(default)]
|
||||
pub mounts: serde_json::Value,
|
||||
#[serde(default)]
|
||||
pub network_enabled: bool,
|
||||
#[serde(default)]
|
||||
pub ephemeral: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub auto_stop_after_secs: Option<i32>,
|
||||
}
|
||||
|
||||
fn default_cpu_limit() -> i32 {
|
||||
1
|
||||
}
|
||||
fn default_memory_limit_mb() -> i32 {
|
||||
512
|
||||
}
|
||||
fn default_disk_limit_mb() -> i32 {
|
||||
1024
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum SandboxMode {
|
||||
#[default]
|
||||
Embedded,
|
||||
Remote,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for SandboxMode {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
match self {
|
||||
SandboxMode::Embedded => write!(f, "embedded"),
|
||||
SandboxMode::Remote => write!(f, "remote"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SandboxInfo {
|
||||
pub id: Uuid,
|
||||
pub workspace_id: String,
|
||||
pub status: SandboxStatus,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub image: Option<String>,
|
||||
pub labels: serde_json::Value,
|
||||
pub mode: String,
|
||||
pub created_by: String,
|
||||
pub created_at: DateTime<Utc>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub started_at: Option<DateTime<Utc>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub last_activity_at: Option<DateTime<Utc>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub suspended_at: Option<DateTime<Utc>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub stopped_at: Option<DateTime<Utc>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub error_message: Option<String>,
|
||||
pub ephemeral: bool,
|
||||
pub cpu_limit: i32,
|
||||
pub memory_limit_mb: i32,
|
||||
pub network_enabled: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ExecRequest {
|
||||
pub command: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub timeout_secs: Option<i32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub cwd: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub env: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ExecResult {
|
||||
pub exec_id: Uuid,
|
||||
pub exit_code: i32,
|
||||
pub stdout: String,
|
||||
pub stderr: String,
|
||||
pub duration_ms: i64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct WriteFileRequest {
|
||||
pub path: String,
|
||||
pub content: String,
|
||||
}
|
||||
@@ -42,6 +42,7 @@ bedrock = ["dep:aws-sdk-bedrockruntime", "windmill-common/bedrock", "dep:aws-con
|
||||
|
||||
[dependencies]
|
||||
windmill-queue.workspace = true
|
||||
windmill-sandbox.workspace = true
|
||||
windmill-dep-map.workspace = true
|
||||
windmill-audit.workspace = true # there isn't really a reason for audit-worth actions to happen in the worker.
|
||||
windmill-common = { workspace = true, default-features = false }
|
||||
|
||||
95
backend/windmill-worker/nsjail/sandbox.config.proto
Normal file
95
backend/windmill-worker/nsjail/sandbox.config.proto
Normal file
@@ -0,0 +1,95 @@
|
||||
name: "windmill sandbox"
|
||||
|
||||
mode: LISTEN
|
||||
hostname: "sandbox"
|
||||
log_level: ERROR
|
||||
|
||||
disable_rl: true
|
||||
|
||||
cwd: "/workspace"
|
||||
|
||||
clone_newnet: {CLONE_NEWNET}
|
||||
clone_newuser: {CLONE_NEWUSER}
|
||||
|
||||
skip_setsid: true
|
||||
keep_caps: false
|
||||
keep_env: true
|
||||
mount_proc: true
|
||||
|
||||
mount {
|
||||
src: "/bin"
|
||||
dst: "/bin"
|
||||
is_bind: true
|
||||
}
|
||||
|
||||
mount {
|
||||
src: "/proc/self/fd"
|
||||
dst: "/dev/fd"
|
||||
is_symlink: true
|
||||
mandatory: false
|
||||
}
|
||||
|
||||
mount {
|
||||
src: "/lib"
|
||||
dst: "/lib"
|
||||
is_bind: true
|
||||
}
|
||||
|
||||
mount {
|
||||
src: "/lib64"
|
||||
dst: "/lib64"
|
||||
is_bind: true
|
||||
mandatory: false
|
||||
}
|
||||
|
||||
mount {
|
||||
src: "/usr"
|
||||
dst: "/usr"
|
||||
is_bind: true
|
||||
}
|
||||
|
||||
mount {
|
||||
src: "/etc"
|
||||
dst: "/etc"
|
||||
is_bind: true
|
||||
}
|
||||
|
||||
mount {
|
||||
src: "/dev/null"
|
||||
dst: "/dev/null"
|
||||
is_bind: true
|
||||
rw: true
|
||||
}
|
||||
|
||||
mount {
|
||||
src: "/dev/random"
|
||||
dst: "/dev/random"
|
||||
is_bind: true
|
||||
}
|
||||
|
||||
mount {
|
||||
src: "/dev/urandom"
|
||||
dst: "/dev/urandom"
|
||||
is_bind: true
|
||||
}
|
||||
|
||||
mount {
|
||||
src: "{SANDBOX_DIR}"
|
||||
dst: "/workspace"
|
||||
is_bind: true
|
||||
rw: true
|
||||
}
|
||||
|
||||
mount {
|
||||
dst: "/tmp"
|
||||
fstype: "tmpfs"
|
||||
rw: true
|
||||
options: "size={DISK_LIMIT}"
|
||||
}
|
||||
|
||||
iface_no_lo: {IFACE_NO_LO}
|
||||
|
||||
cgroup_mem_max: {MEMORY_LIMIT}
|
||||
cgroup_cpu_ms_per_sec: {CPU_MS_PER_SEC}
|
||||
|
||||
{EXTRA_MOUNTS}
|
||||
@@ -66,6 +66,8 @@ mod python_versions;
|
||||
pub mod result_processor;
|
||||
#[cfg(feature = "rust")]
|
||||
mod rust_executor;
|
||||
#[allow(dead_code)]
|
||||
mod sandbox_server;
|
||||
mod sanitized_sql_params;
|
||||
mod schema;
|
||||
pub mod sql_utils;
|
||||
|
||||
221
backend/windmill-worker/src/sandbox_server.rs
Normal file
221
backend/windmill-worker/src/sandbox_server.rs
Normal file
@@ -0,0 +1,221 @@
|
||||
use std::future::IntoFuture;
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::{
|
||||
extract::{Path, Query},
|
||||
routing::{get, post},
|
||||
Extension, Json, Router,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use tokio::net::TcpListener;
|
||||
use uuid::Uuid;
|
||||
use windmill_sandbox::{ExecRequest, ExecResult, SandboxConfig, SandboxManager};
|
||||
|
||||
struct EmbeddedSandboxState {
|
||||
manager: SandboxManager,
|
||||
}
|
||||
|
||||
pub struct EmbeddedSandboxHandle {
|
||||
port: u16,
|
||||
state: Arc<EmbeddedSandboxState>,
|
||||
shutdown_tx: tokio::sync::oneshot::Sender<()>,
|
||||
}
|
||||
|
||||
impl EmbeddedSandboxHandle {
|
||||
pub fn url(&self) -> String {
|
||||
format!("http://127.0.0.1:{}", self.port)
|
||||
}
|
||||
|
||||
pub async fn shutdown(self) {
|
||||
self.state.manager.cleanup_all().await;
|
||||
let _ = self.shutdown_tx.send(());
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn start_embedded_sandbox_server(
|
||||
base_dir: String,
|
||||
) -> windmill_common::error::Result<EmbeddedSandboxHandle> {
|
||||
let manager = SandboxManager::new(base_dir);
|
||||
let state = Arc::new(EmbeddedSandboxState { manager });
|
||||
|
||||
let app = Router::new()
|
||||
.route("/sandbox/create", post(handle_create))
|
||||
.route("/sandbox/:id/exec", post(handle_exec))
|
||||
.route("/sandbox/:id/suspend", post(handle_suspend))
|
||||
.route("/sandbox/:id/resume", post(handle_resume))
|
||||
.route("/sandbox/:id/terminate", post(handle_terminate))
|
||||
.route("/sandbox/:id/status", get(handle_status))
|
||||
.route("/sandbox/:id/write_file", post(handle_write_file))
|
||||
.route("/sandbox/:id/read_file", get(handle_read_file))
|
||||
.route("/sandboxes", get(handle_list))
|
||||
.layer(Extension(state.clone()));
|
||||
|
||||
let listener = TcpListener::bind("127.0.0.1:0")
|
||||
.await
|
||||
.map_err(|e| {
|
||||
windmill_common::error::Error::InternalErr(format!(
|
||||
"Failed to bind embedded sandbox server: {e}"
|
||||
))
|
||||
})?;
|
||||
let port = listener.local_addr().unwrap().port();
|
||||
|
||||
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let server = axum::serve(listener, app);
|
||||
tokio::select! {
|
||||
result = server.into_future() => {
|
||||
if let Err(e) = result {
|
||||
tracing::error!("Embedded sandbox server error: {e}");
|
||||
}
|
||||
}
|
||||
_ = shutdown_rx => {
|
||||
tracing::debug!("Embedded sandbox server shutting down");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
tracing::info!("Embedded sandbox server started on port {port}");
|
||||
|
||||
Ok(EmbeddedSandboxHandle {
|
||||
port,
|
||||
state,
|
||||
shutdown_tx,
|
||||
})
|
||||
}
|
||||
|
||||
async fn handle_create(
|
||||
Extension(state): Extension<Arc<EmbeddedSandboxState>>,
|
||||
Json(config): Json<SandboxConfig>,
|
||||
) -> Result<Json<serde_json::Value>, axum::http::StatusCode> {
|
||||
match state.manager.create_sandbox(config).await {
|
||||
Ok((id, status)) => Ok(Json(serde_json::json!({
|
||||
"id": id,
|
||||
"status": status,
|
||||
}))),
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to create sandbox: {e}");
|
||||
Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_exec(
|
||||
Extension(state): Extension<Arc<EmbeddedSandboxState>>,
|
||||
Path(id): Path<Uuid>,
|
||||
Json(request): Json<ExecRequest>,
|
||||
) -> Result<Json<ExecResult>, axum::http::StatusCode> {
|
||||
match state.manager.exec(id, request).await {
|
||||
Ok(result) => Ok(Json(result)),
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to exec in sandbox {id}: {e}");
|
||||
Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_suspend(
|
||||
Extension(state): Extension<Arc<EmbeddedSandboxState>>,
|
||||
Path(id): Path<Uuid>,
|
||||
) -> Result<Json<serde_json::Value>, axum::http::StatusCode> {
|
||||
match state.manager.suspend(id).await {
|
||||
Ok(status) => Ok(Json(serde_json::json!({ "status": status }))),
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to suspend sandbox {id}: {e}");
|
||||
Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_resume(
|
||||
Extension(state): Extension<Arc<EmbeddedSandboxState>>,
|
||||
Path(id): Path<Uuid>,
|
||||
) -> Result<Json<serde_json::Value>, axum::http::StatusCode> {
|
||||
match state.manager.resume(id).await {
|
||||
Ok(status) => Ok(Json(serde_json::json!({ "status": status }))),
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to resume sandbox {id}: {e}");
|
||||
Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_terminate(
|
||||
Extension(state): Extension<Arc<EmbeddedSandboxState>>,
|
||||
Path(id): Path<Uuid>,
|
||||
) -> Result<Json<serde_json::Value>, axum::http::StatusCode> {
|
||||
match state.manager.terminate(id).await {
|
||||
Ok(status) => Ok(Json(serde_json::json!({ "status": status }))),
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to terminate sandbox {id}: {e}");
|
||||
Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_status(
|
||||
Extension(state): Extension<Arc<EmbeddedSandboxState>>,
|
||||
Path(id): Path<Uuid>,
|
||||
) -> Result<Json<serde_json::Value>, axum::http::StatusCode> {
|
||||
match state.manager.status(id).await {
|
||||
Ok(status) => Ok(Json(serde_json::json!({ "id": id, "status": status }))),
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to get sandbox {id} status: {e}");
|
||||
Err(axum::http::StatusCode::NOT_FOUND)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct WriteFileBody {
|
||||
path: String,
|
||||
content: String,
|
||||
}
|
||||
|
||||
async fn handle_write_file(
|
||||
Extension(state): Extension<Arc<EmbeddedSandboxState>>,
|
||||
Path(id): Path<Uuid>,
|
||||
Json(body): Json<WriteFileBody>,
|
||||
) -> Result<Json<serde_json::Value>, axum::http::StatusCode> {
|
||||
match state
|
||||
.manager
|
||||
.write_file(id, &body.path, body.content.as_bytes())
|
||||
.await
|
||||
{
|
||||
Ok(()) => Ok(Json(serde_json::json!({ "ok": true }))),
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to write file to sandbox {id}: {e}");
|
||||
Err(axum::http::StatusCode::INTERNAL_SERVER_ERROR)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct ReadFileQuery {
|
||||
path: String,
|
||||
}
|
||||
|
||||
async fn handle_read_file(
|
||||
Extension(state): Extension<Arc<EmbeddedSandboxState>>,
|
||||
Path(id): Path<Uuid>,
|
||||
Query(query): Query<ReadFileQuery>,
|
||||
) -> Result<String, axum::http::StatusCode> {
|
||||
match state.manager.read_file(id, &query.path).await {
|
||||
Ok(content) => Ok(content),
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to read file from sandbox {id}: {e}");
|
||||
Err(axum::http::StatusCode::NOT_FOUND)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_list(
|
||||
Extension(state): Extension<Arc<EmbeddedSandboxState>>,
|
||||
) -> Json<serde_json::Value> {
|
||||
let sandboxes = state.manager.list_sandboxes().await;
|
||||
let list: Vec<serde_json::Value> = sandboxes
|
||||
.into_iter()
|
||||
.map(|(id, status)| serde_json::json!({ "id": id, "status": status }))
|
||||
.collect();
|
||||
Json(serde_json::json!(list))
|
||||
}
|
||||
@@ -2352,7 +2352,226 @@ def parse_sql_client_name(name: str) -> tuple[str, Optional[str]]:
|
||||
name = name
|
||||
schema = None
|
||||
if ":" in name:
|
||||
name, schema = name.split(":", 1)
|
||||
name, schema = name.split(":", 1)
|
||||
if not name:
|
||||
name = "main"
|
||||
return name, schema
|
||||
return name, schema
|
||||
|
||||
|
||||
# --- Sandbox SDK ---
|
||||
|
||||
|
||||
class SandboxExecResult:
|
||||
def __init__(self, data: dict):
|
||||
self.exec_id: str = data.get("exec_id", "")
|
||||
self.exit_code: int = data.get("exit_code", -1)
|
||||
self.stdout: str = data.get("stdout", "")
|
||||
self.stderr: str = data.get("stderr", "")
|
||||
self.duration_ms: int = data.get("duration_ms", 0)
|
||||
|
||||
def __repr__(self):
|
||||
return f"SandboxExecResult(exit_code={self.exit_code}, duration_ms={self.duration_ms})"
|
||||
|
||||
|
||||
class SandboxInfo:
|
||||
def __init__(self, data: dict):
|
||||
self.id: str = data["id"]
|
||||
self.workspace_id: str = data.get("workspace_id", "")
|
||||
self.status: str = data["status"]
|
||||
self.image: Optional[str] = data.get("image")
|
||||
self.labels: dict = data.get("labels", {})
|
||||
self.mode: str = data.get("mode", "embedded")
|
||||
self.created_by: str = data.get("created_by", "")
|
||||
self.created_at: str = data.get("created_at", "")
|
||||
self.started_at: Optional[str] = data.get("started_at")
|
||||
self.last_activity_at: Optional[str] = data.get("last_activity_at")
|
||||
self.suspended_at: Optional[str] = data.get("suspended_at")
|
||||
self.stopped_at: Optional[str] = data.get("stopped_at")
|
||||
self.error_message: Optional[str] = data.get("error_message")
|
||||
self.ephemeral: bool = data.get("ephemeral", False)
|
||||
self.cpu_limit: int = data.get("cpu_limit", 1)
|
||||
self.memory_limit_mb: int = data.get("memory_limit_mb", 512)
|
||||
self.network_enabled: bool = data.get("network_enabled", False)
|
||||
|
||||
def __repr__(self):
|
||||
return f"SandboxInfo(id={self.id}, status={self.status})"
|
||||
|
||||
|
||||
class Sandbox:
|
||||
def __init__(self, sandbox_id: str, base_url: str, use_direct_http: bool, token: str = "", workspace: str = ""):
|
||||
self.id = sandbox_id
|
||||
self._base_url = base_url
|
||||
self._use_direct_http = use_direct_http
|
||||
self._token = token
|
||||
self._workspace = workspace
|
||||
|
||||
@staticmethod
|
||||
def create(
|
||||
mode: str = "embedded",
|
||||
image: Optional[str] = None,
|
||||
timeout_secs: Optional[int] = None,
|
||||
idle_timeout_secs: Optional[int] = None,
|
||||
cpu_limit: int = 1,
|
||||
memory_limit_mb: int = 512,
|
||||
disk_limit_mb: int = 1024,
|
||||
env_vars: Optional[Dict[str, str]] = None,
|
||||
labels: Optional[Dict[str, str]] = None,
|
||||
network_enabled: bool = False,
|
||||
ephemeral: bool = False,
|
||||
) -> "Sandbox":
|
||||
config = {
|
||||
"mode": mode,
|
||||
"cpu_limit": cpu_limit,
|
||||
"memory_limit_mb": memory_limit_mb,
|
||||
"disk_limit_mb": disk_limit_mb,
|
||||
"env_vars": env_vars or {},
|
||||
"labels": labels or {},
|
||||
"mounts": [],
|
||||
"network_enabled": network_enabled,
|
||||
"ephemeral": ephemeral,
|
||||
}
|
||||
if image is not None:
|
||||
config["image"] = image
|
||||
if timeout_secs is not None:
|
||||
config["timeout_secs"] = timeout_secs
|
||||
if idle_timeout_secs is not None:
|
||||
config["idle_timeout_secs"] = idle_timeout_secs
|
||||
|
||||
sandbox_url = os.environ.get("WM_SANDBOX_URL")
|
||||
if mode == "embedded" and sandbox_url:
|
||||
resp = httpx.post(f"{sandbox_url}/sandbox/create", json=config, timeout=30)
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
return Sandbox(data["id"], sandbox_url, True)
|
||||
|
||||
wm = _client or Windmill()
|
||||
data = wm.post(
|
||||
f"/w/{wm.workspace}/sandboxes/create",
|
||||
json=config,
|
||||
).json()
|
||||
return Sandbox(
|
||||
data["id"],
|
||||
wm.base_url,
|
||||
False,
|
||||
wm.token,
|
||||
wm.workspace,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def get(sandbox_id: str) -> "Sandbox":
|
||||
sandbox_url = os.environ.get("WM_SANDBOX_URL")
|
||||
if sandbox_url:
|
||||
return Sandbox(sandbox_id, sandbox_url, True)
|
||||
|
||||
wm = _client or Windmill()
|
||||
return Sandbox(sandbox_id, wm.base_url, False, wm.token, wm.workspace)
|
||||
|
||||
@staticmethod
|
||||
def list(
|
||||
status: Optional[list[str]] = None,
|
||||
labels: Optional[Dict[str, str]] = None,
|
||||
) -> list[SandboxInfo]:
|
||||
wm = _client or Windmill()
|
||||
params: Dict[str, str] = {}
|
||||
if status:
|
||||
params["status"] = ",".join(status)
|
||||
if labels:
|
||||
first_key = next(iter(labels))
|
||||
params["label_key"] = first_key
|
||||
params["label_value"] = labels[first_key]
|
||||
resp = wm.get(f"/w/{wm.workspace}/sandboxes/list", params=params).json()
|
||||
return [SandboxInfo(s) for s in resp]
|
||||
|
||||
def exec(
|
||||
self,
|
||||
command: str,
|
||||
*,
|
||||
timeout_secs: Optional[int] = None,
|
||||
cwd: Optional[str] = None,
|
||||
env: Optional[Dict[str, str]] = None,
|
||||
) -> SandboxExecResult:
|
||||
payload: Dict[str, Any] = {"command": command}
|
||||
if timeout_secs is not None:
|
||||
payload["timeout_secs"] = timeout_secs
|
||||
if cwd is not None:
|
||||
payload["cwd"] = cwd
|
||||
if env is not None:
|
||||
payload["env"] = env
|
||||
|
||||
if self._use_direct_http:
|
||||
resp = httpx.post(
|
||||
f"{self._base_url}/sandbox/{self.id}/exec",
|
||||
json=payload,
|
||||
timeout=httpx.Timeout((timeout_secs or 300) + 10, connect=10),
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return SandboxExecResult(resp.json())
|
||||
|
||||
wm = _client or Windmill()
|
||||
data = wm.post(f"/w/{wm.workspace}/sandboxes/{self.id}/exec", json=payload).json()
|
||||
return SandboxExecResult(data)
|
||||
|
||||
def suspend(self) -> None:
|
||||
self._action("suspend")
|
||||
|
||||
def resume(self) -> None:
|
||||
self._action("resume")
|
||||
|
||||
def terminate(self) -> None:
|
||||
self._action("terminate")
|
||||
|
||||
def status(self) -> SandboxInfo:
|
||||
if self._use_direct_http:
|
||||
resp = httpx.get(f"{self._base_url}/sandbox/{self.id}/status", timeout=10)
|
||||
resp.raise_for_status()
|
||||
return SandboxInfo(resp.json())
|
||||
|
||||
wm = _client or Windmill()
|
||||
data = wm.get(f"/w/{wm.workspace}/sandboxes/{self.id}").json()
|
||||
return SandboxInfo(data)
|
||||
|
||||
def write_file(self, path: str, content: Union[str, bytes]) -> None:
|
||||
if isinstance(content, bytes):
|
||||
content = content.decode("utf-8")
|
||||
|
||||
if self._use_direct_http:
|
||||
resp = httpx.post(
|
||||
f"{self._base_url}/sandbox/{self.id}/write_file",
|
||||
json={"path": path, "content": content},
|
||||
timeout=30,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return
|
||||
|
||||
wm = _client or Windmill()
|
||||
wm.post(f"/w/{wm.workspace}/sandboxes/{self.id}/write_file", json={"path": path, "content": content})
|
||||
|
||||
def read_file(self, path: str) -> str:
|
||||
if self._use_direct_http:
|
||||
resp = httpx.get(
|
||||
f"{self._base_url}/sandbox/{self.id}/read_file",
|
||||
params={"path": path},
|
||||
timeout=30,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.text()
|
||||
|
||||
wm = _client or Windmill()
|
||||
return wm.get(f"/w/{wm.workspace}/sandboxes/{self.id}/read_file", params={"path": path}).json()
|
||||
|
||||
def list_execs(self) -> list[SandboxExecResult]:
|
||||
wm = _client or Windmill()
|
||||
data = wm.get(f"/w/{wm.workspace}/sandboxes/{self.id}/execs").json()
|
||||
return [SandboxExecResult(e) for e in data]
|
||||
|
||||
def _action(self, action: str) -> None:
|
||||
if self._use_direct_http:
|
||||
resp = httpx.post(
|
||||
f"{self._base_url}/sandbox/{self.id}/{action}",
|
||||
timeout=30,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return
|
||||
|
||||
wm = _client or Windmill()
|
||||
wm.post(f"/w/{wm.workspace}/sandboxes/{self.id}/{action}", json={})
|
||||
Reference in New Issue
Block a user