Compare commits

...

2 Commits

Author SHA1 Message Date
Ruben Fiszel
741526b7b8 experiment: Add full flow executor with branch/loop support
Add flow_executor.rs that supports executing complex flows in local mode:
- ForloopFlow: iterate over arrays/ranges with sequential execution
- WhileloopFlow: execute modules while condition is true
- BranchOne: if/else branching based on conditions
- BranchAll: parallel branch execution (sequential for now)
- RawScript: inline script execution (bash, python, deno, bun)
- Identity: pass-through module

Key features:
- Uses windmill-common FlowValue types for compatibility
- Expression evaluation for input transforms with comparisons
- Proper flow status tracking with module results
- Recursive async execution with async_recursion crate

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-23 07:14:59 +00:00
Ruben Fiszel
73deef44ed experiment: Add windmill-local crate with libSQL/Turso support
This experimental crate demonstrates running Windmill preview endpoints
with libSQL (SQLite/Turso) instead of PostgreSQL. Key features:

- Schema: SQLite-compatible schema for jobs, queue, and results
  - ENUMs → TEXT with CHECK constraints
  - JSONB → JSON (TEXT)
  - Arrays → JSON arrays
  - No FOR UPDATE SKIP LOCKED (single worker, mutex coordination)

- Database: Supports three modes via libsql crate:
  - In-memory SQLite (for testing)
  - File-based SQLite (local persistence)
  - Remote Turso (multi-writer scenarios)

- API: Compatible preview endpoints:
  - POST /api/w/{workspace}/jobs/run/preview
  - POST /api/w/{workspace}/jobs/run_wait_result/preview
  - POST /api/w/{workspace}/jobs/run/preview_flow
  - POST /api/w/{workspace}/jobs/run_wait_result/preview_flow

- Executor: Simple script execution for bash, python3, deno, bun

- Worker: Single embedded worker that processes queue

Run with: cargo run -p windmill-local --example local_server

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-23 07:01:44 +00:00
14 changed files with 3598 additions and 40 deletions

587
backend/Cargo.lock generated
View File

@@ -125,7 +125,7 @@ dependencies = [
"getrandom 0.3.4",
"once_cell",
"version_check",
"zerocopy",
"zerocopy 0.8.33",
]
[[package]]
@@ -1302,6 +1302,34 @@ dependencies = [
"tracing",
]
[[package]]
name = "axum"
version = "0.6.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf"
dependencies = [
"async-trait",
"axum-core 0.3.4",
"bitflags 1.3.2",
"bytes",
"futures-util",
"http 0.2.12",
"http-body 0.4.6",
"hyper 0.14.32",
"itoa",
"matchit 0.7.3",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"sync_wrapper 0.1.2",
"tower 0.4.13",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum"
version = "0.7.9"
@@ -1330,7 +1358,7 @@ dependencies = [
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sync_wrapper",
"sync_wrapper 1.0.2",
"tokio",
"tower 0.5.3",
"tower-layer",
@@ -1364,7 +1392,7 @@ dependencies = [
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sync_wrapper",
"sync_wrapper 1.0.2",
"tokio",
"tower 0.5.3",
"tower-layer",
@@ -1372,6 +1400,23 @@ dependencies = [
"tracing",
]
[[package]]
name = "axum-core"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http 0.2.12",
"http-body 0.4.6",
"mime",
"rustversion",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.4.5"
@@ -1387,7 +1432,7 @@ dependencies = [
"mime",
"pin-project-lite",
"rustversion",
"sync_wrapper",
"sync_wrapper 1.0.2",
"tower-layer",
"tower-service",
"tracing",
@@ -1406,7 +1451,7 @@ dependencies = [
"http-body-util",
"mime",
"pin-project-lite",
"sync_wrapper",
"sync_wrapper 1.0.2",
"tower-layer",
"tower-service",
"tracing",
@@ -1541,6 +1586,29 @@ dependencies = [
"serde",
]
[[package]]
name = "bindgen"
version = "0.66.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f2b84e06fc203107bfbad243f4aba2af864eb7db3b1cf46ea0a023b0b433d2a7"
dependencies = [
"bitflags 2.9.4",
"cexpr",
"clang-sys",
"lazy_static",
"lazycell",
"log",
"peeking_take_while",
"prettyplease",
"proc-macro2",
"quote",
"regex",
"rustc-hash 1.1.0",
"shlex",
"syn 2.0.114",
"which 4.4.2",
]
[[package]]
name = "bindgen"
version = "0.69.5"
@@ -1827,7 +1895,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0686c856aa6aac0c4498f936d7d6a02df690f614c03e4d906d1018062b5c5e2c"
dependencies = [
"once_cell",
"proc-macro-crate",
"proc-macro-crate 3.4.0",
"proc-macro2",
"quote",
"syn 2.0.114",
@@ -3889,7 +3957,7 @@ dependencies = [
"tokio-socks",
"tokio-util",
"tower 0.5.3",
"tower-http",
"tower-http 0.6.8",
"tower-service",
]
@@ -4031,7 +4099,7 @@ dependencies = [
"http-body-util",
"log",
"num-bigint",
"prost",
"prost 0.13.5",
"prost-build",
"rand 0.8.5",
"rusqlite",
@@ -4654,7 +4722,7 @@ dependencies = [
"deno_error",
"futures",
"num-bigint",
"prost",
"prost 0.13.5",
"serde",
"uuid",
]
@@ -4674,7 +4742,7 @@ dependencies = [
"futures",
"http 1.4.0",
"log",
"prost",
"prost 0.13.5",
"rand 0.8.5",
"serde",
"serde_json",
@@ -6340,7 +6408,7 @@ dependencies = [
"thiserror 1.0.69",
"tokio",
"tokio-retry2",
"tonic",
"tonic 0.12.3",
"tower 0.4.13",
"tracing",
]
@@ -6351,9 +6419,9 @@ version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "886aa8ec755382a1fdf4651f6e6ec01f2f3bf49f2cb0f068b9a74cafd574a715"
dependencies = [
"prost",
"prost 0.13.5",
"prost-types",
"tonic",
"tonic 0.12.3",
]
[[package]]
@@ -6516,7 +6584,7 @@ dependencies = [
"num-traits",
"rand 0.9.0",
"rand_distr 0.5.1",
"zerocopy",
"zerocopy 0.8.33",
]
[[package]]
@@ -6576,6 +6644,15 @@ dependencies = [
"syn 2.0.114",
]
[[package]]
name = "hashlink"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7"
dependencies = [
"hashbrown 0.14.5",
]
[[package]]
name = "hashlink"
version = "0.9.1"
@@ -6858,6 +6935,12 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "http-range-header"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f"
[[package]]
name = "httparse"
version = "1.10.1"
@@ -7006,6 +7089,24 @@ dependencies = [
"tokio-rustls 0.24.1",
]
[[package]]
name = "hyper-rustls"
version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "399c78f9338483cb7e630c8474b07268983c6bd5acee012e4211f9f7bb21b070"
dependencies = [
"futures-util",
"http 0.2.12",
"hyper 0.14.32",
"log",
"rustls 0.22.4",
"rustls-native-certs 0.7.3",
"rustls-pki-types",
"tokio",
"tokio-rustls 0.25.0",
"webpki-roots 0.26.11",
]
[[package]]
name = "hyper-rustls"
version = "0.26.0"
@@ -7044,6 +7145,18 @@ dependencies = [
"webpki-roots 1.0.5",
]
[[package]]
name = "hyper-timeout"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1"
dependencies = [
"hyper 0.14.32",
"pin-project-lite",
"tokio",
"tokio-io-timeout",
]
[[package]]
name = "hyper-timeout"
version = "0.5.2"
@@ -7777,7 +7890,7 @@ dependencies = [
"hyper 1.8.1",
"hyper-http-proxy",
"hyper-rustls 0.27.7",
"hyper-timeout",
"hyper-timeout 0.5.2",
"hyper-util",
"jsonpath-rust",
"k8s-openapi",
@@ -7792,7 +7905,7 @@ dependencies = [
"tokio",
"tokio-util",
"tower 0.5.3",
"tower-http",
"tower-http 0.6.8",
"tracing",
]
@@ -8036,6 +8149,142 @@ dependencies = [
"redox_syscall 0.7.0",
]
[[package]]
name = "libsql"
version = "0.9.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2329faffc510cc3c6b4f00169a39177cc7099d3ed7647fc92f7cf26e53a8d976"
dependencies = [
"anyhow",
"async-stream",
"async-trait",
"base64 0.21.7",
"bincode",
"bitflags 2.9.4",
"bytes",
"chrono",
"crc32fast",
"fallible-iterator 0.3.0",
"futures",
"http 0.2.12",
"hyper 0.14.32",
"hyper-rustls 0.25.0",
"libsql-hrana",
"libsql-sqlite3-parser",
"libsql-sys",
"libsql_replication",
"parking_lot",
"serde",
"serde_json",
"thiserror 1.0.69",
"tokio",
"tokio-stream",
"tokio-util",
"tonic 0.11.0",
"tonic-web",
"tower 0.4.13",
"tower-http 0.4.4",
"tracing",
"uuid",
"zerocopy 0.7.35",
]
[[package]]
name = "libsql-ffi"
version = "0.9.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6cd1c1662822495393327856774f6803be25d85bfdcd5b9d4af35458f5daaf75"
dependencies = [
"bindgen 0.66.1",
"cc",
"cmake",
"glob",
]
[[package]]
name = "libsql-hrana"
version = "0.9.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "646d0aa75e412769018422f0da798f72e93bd51964f0b2ddad4317aa779ae444"
dependencies = [
"base64 0.21.7",
"bytes",
"prost 0.12.6",
"serde",
]
[[package]]
name = "libsql-rusqlite"
version = "0.9.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a4ce3a78c6e3c2b23b02ab6272df8340e1c53380497979d456882254f348d5f"
dependencies = [
"bitflags 2.9.4",
"fallible-iterator 0.2.0",
"fallible-streaming-iterator",
"hashlink 0.8.4",
"libsql-ffi",
"smallvec",
]
[[package]]
name = "libsql-sqlite3-parser"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "15a90128c708356af8f7d767c9ac2946692c9112b4f74f07b99a01a60680e413"
dependencies = [
"bitflags 2.9.4",
"cc",
"fallible-iterator 0.3.0",
"indexmap 2.11.1",
"log",
"memchr",
"phf 0.11.3",
"phf_codegen",
"phf_shared 0.11.3",
"uncased",
]
[[package]]
name = "libsql-sys"
version = "0.9.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a3c326fcfc36fe7578238d5ee6b58c529f8c76372acd61ec50267529cdaff95"
dependencies = [
"bytes",
"libsql-ffi",
"libsql-rusqlite",
"once_cell",
"tracing",
"zerocopy 0.7.35",
]
[[package]]
name = "libsql_replication"
version = "0.9.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d9a2e469ac8400659bd31f81a745908bcc5cb6b40be2f2ff8de90b15bec5501"
dependencies = [
"aes 0.8.3",
"async-stream",
"async-trait",
"bytes",
"cbc",
"libsql-rusqlite",
"libsql-sys",
"parking_lot",
"prost 0.12.6",
"serde",
"thiserror 1.0.69",
"tokio",
"tokio-stream",
"tokio-util",
"tonic 0.11.0",
"tracing",
"uuid",
"zerocopy 0.7.35",
]
[[package]]
name = "libsqlite3-sys"
version = "0.30.1"
@@ -8654,7 +8903,7 @@ dependencies = [
"darling 0.20.11",
"heck 0.5.0",
"num-bigint",
"proc-macro-crate",
"proc-macro-crate 3.4.0",
"proc-macro-error2",
"proc-macro2",
"quote",
@@ -9223,7 +9472,7 @@ version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ff32365de1b6743cb203b710788263c44a03de03802daf96092f2da4fe6ba4d7"
dependencies = [
"proc-macro-crate",
"proc-macro-crate 3.4.0",
"proc-macro2",
"quote",
"syn 2.0.114",
@@ -9554,11 +9803,11 @@ dependencies = [
"opentelemetry-http",
"opentelemetry-proto 0.27.0",
"opentelemetry_sdk 0.27.1",
"prost",
"prost 0.13.5",
"serde_json",
"thiserror 1.0.69",
"tokio",
"tonic",
"tonic 0.12.3",
"tracing",
]
@@ -9571,9 +9820,9 @@ dependencies = [
"hex",
"opentelemetry 0.27.1",
"opentelemetry_sdk 0.27.1",
"prost",
"prost 0.13.5",
"serde",
"tonic",
"tonic 0.12.3",
]
[[package]]
@@ -9586,9 +9835,9 @@ dependencies = [
"hex",
"opentelemetry 0.29.1",
"opentelemetry_sdk 0.29.0",
"prost",
"prost 0.13.5",
"serde",
"tonic",
"tonic 0.12.3",
"tracing",
]
@@ -9874,6 +10123,12 @@ dependencies = [
"hmac",
]
[[package]]
name = "peeking_take_while"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19b17cddbe7ec3f8bc800887bab5e717348c95ea2ca0b1bf0837fb964dc67099"
[[package]]
name = "pem"
version = "1.1.1"
@@ -10041,6 +10296,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5"
dependencies = [
"siphasher 1.0.1",
"uncased",
]
[[package]]
@@ -10277,7 +10533,7 @@ version = "0.2.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9"
dependencies = [
"zerocopy",
"zerocopy 0.8.33",
]
[[package]]
@@ -10305,6 +10561,16 @@ dependencies = [
"elliptic-curve",
]
[[package]]
name = "proc-macro-crate"
version = "1.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f4c021e1093a56626774e81216a4ce732a735e5bad4868a03f3ed65ca0c3919"
dependencies = [
"once_cell",
"toml_edit 0.19.15",
]
[[package]]
name = "proc-macro-crate"
version = "3.4.0"
@@ -10451,6 +10717,16 @@ dependencies = [
"thiserror 2.0.18",
]
[[package]]
name = "prost"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29"
dependencies = [
"bytes",
"prost-derive 0.12.6",
]
[[package]]
name = "prost"
version = "0.13.5"
@@ -10458,7 +10734,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
dependencies = [
"bytes",
"prost-derive",
"prost-derive 0.13.5",
]
[[package]]
@@ -10474,13 +10750,26 @@ dependencies = [
"once_cell",
"petgraph",
"prettyplease",
"prost",
"prost 0.13.5",
"prost-types",
"regex",
"syn 2.0.114",
"tempfile",
]
[[package]]
name = "prost-derive"
version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81bddcdb20abf9501610992b6759a4c888aef7d1a7247ef75e2404275ac24af1"
dependencies = [
"anyhow",
"itertools 0.12.1",
"proc-macro2",
"quote",
"syn 2.0.114",
]
[[package]]
name = "prost-derive"
version = "0.13.5"
@@ -10500,7 +10789,7 @@ version = "0.13.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52c2c1bf36ddb1a1c396b3601a3cec27c2462e45f07c386894ec3ccf5332bd16"
dependencies = [
"prost",
"prost 0.13.5",
]
[[package]]
@@ -10725,7 +11014,7 @@ checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94"
dependencies = [
"rand_chacha 0.9.0",
"rand_core 0.9.5",
"zerocopy",
"zerocopy 0.8.33",
]
[[package]]
@@ -11020,6 +11309,12 @@ version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58"
[[package]]
name = "relative-path"
version = "1.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2"
[[package]]
name = "rend"
version = "0.4.2"
@@ -11061,13 +11356,13 @@ dependencies = [
"serde",
"serde_json",
"serde_urlencoded",
"sync_wrapper",
"sync_wrapper 1.0.2",
"tokio",
"tokio-native-tls",
"tokio-rustls 0.26.4",
"tokio-util",
"tower 0.5.3",
"tower-http",
"tower-http 0.6.8",
"tower-service",
"url",
"wasm-bindgen",
@@ -11108,12 +11403,12 @@ dependencies = [
"serde",
"serde_json",
"serde_urlencoded",
"sync_wrapper",
"sync_wrapper 1.0.2",
"tokio",
"tokio-rustls 0.26.4",
"tokio-util",
"tower 0.5.3",
"tower-http",
"tower-http 0.6.8",
"tower-service",
"url",
"wasm-bindgen",
@@ -11310,6 +11605,54 @@ dependencies = [
"serde_derive",
]
[[package]]
name = "rquickjs"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d16661bff09e9ed8e01094a188b463de45ec0693ade55b92ed54027d7ba7c40c"
dependencies = [
"rquickjs-core",
"rquickjs-macro",
]
[[package]]
name = "rquickjs-core"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c8db6379e204ef84c0811e90e7cc3e3e4d7688701db68a00d14a6db6849087b"
dependencies = [
"async-lock",
"relative-path",
"rquickjs-sys",
]
[[package]]
name = "rquickjs-macro"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6041104330c019fcd936026ae05e2446f5e8a2abef329d924f25424b7052a2f3"
dependencies = [
"convert_case 0.6.0",
"fnv",
"ident_case",
"indexmap 2.11.1",
"proc-macro-crate 1.3.1",
"proc-macro2",
"quote",
"rquickjs-core",
"syn 2.0.114",
]
[[package]]
name = "rquickjs-sys"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bc352c6b663604c3c186c000cfcc6c271f4b50bc135a285dd6d4f2a42f9790a"
dependencies = [
"bindgen 0.69.5",
"cc",
]
[[package]]
name = "rsa"
version = "0.9.10"
@@ -13426,6 +13769,12 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "sync_wrapper"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "sync_wrapper"
version = "1.0.2"
@@ -14054,6 +14403,16 @@ dependencies = [
"tracing",
]
[[package]]
name = "tokio-io-timeout"
version = "1.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0bd86198d9ee903fedd2f9a2e72014287c0d9167e4ae43b5853007205dda1b76"
dependencies = [
"pin-project-lite",
"tokio",
]
[[package]]
name = "tokio-macros"
version = "2.5.0"
@@ -14202,6 +14561,17 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-test"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f6d24790a10a7af737693a3e8f1d03faef7e6ca0cc99aae5066f533766de545"
dependencies = [
"futures-core",
"tokio",
"tokio-stream",
]
[[package]]
name = "tokio-tungstenite"
version = "0.21.0"
@@ -14336,6 +14706,33 @@ dependencies = [
"winnow 0.7.14",
]
[[package]]
name = "tonic"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13"
dependencies = [
"async-stream",
"async-trait",
"axum 0.6.20",
"base64 0.21.7",
"bytes",
"h2 0.3.27",
"http 0.2.12",
"http-body 0.4.6",
"hyper 0.14.32",
"hyper-timeout 0.4.1",
"percent-encoding",
"pin-project",
"prost 0.12.6",
"tokio",
"tokio-stream",
"tower 0.4.13",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tonic"
version = "0.12.3"
@@ -14353,11 +14750,11 @@ dependencies = [
"http-body 1.0.1",
"http-body-util",
"hyper 1.8.1",
"hyper-timeout",
"hyper-timeout 0.5.2",
"hyper-util",
"percent-encoding",
"pin-project",
"prost",
"prost 0.13.5",
"rustls-native-certs 0.8.3",
"rustls-pemfile 2.2.0",
"socket2 0.5.10",
@@ -14371,6 +14768,26 @@ dependencies = [
"webpki-roots 0.26.11",
]
[[package]]
name = "tonic-web"
version = "0.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc3b0e1cedbf19fdfb78ef3d672cb9928e0a91a9cb4629cc0c916e8cff8aaaa1"
dependencies = [
"base64 0.21.7",
"bytes",
"http 0.2.12",
"http-body 0.4.6",
"hyper 0.14.32",
"pin-project",
"tokio-stream",
"tonic 0.11.0",
"tower-http 0.4.4",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower"
version = "0.4.13"
@@ -14400,7 +14817,7 @@ dependencies = [
"futures-core",
"futures-util",
"pin-project-lite",
"sync_wrapper",
"sync_wrapper 1.0.2",
"tokio",
"tokio-util",
"tower-layer",
@@ -14425,6 +14842,43 @@ dependencies = [
"tower-service",
]
[[package]]
name = "tower-http"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140"
dependencies = [
"bitflags 2.9.4",
"bytes",
"futures-core",
"futures-util",
"http 0.2.12",
"http-body 0.4.6",
"http-range-header",
"pin-project-lite",
"tower 0.4.13",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-http"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5"
dependencies = [
"bitflags 2.9.4",
"bytes",
"http 1.4.0",
"http-body 1.0.1",
"http-body-util",
"pin-project-lite",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower-http"
version = "0.6.8"
@@ -14790,6 +15244,15 @@ dependencies = [
"web-time",
]
[[package]]
name = "uncased"
version = "0.9.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e1b88fcfe09e89d3866a5c11019378088af2d24c3fbd4f0543f96b479ec90697"
dependencies = [
"version_check",
]
[[package]]
name = "unic-char-property"
version = "0.9.0"
@@ -15718,10 +16181,10 @@ dependencies = [
"tokio-stream",
"tokio-tungstenite 0.24.0",
"tokio-util",
"tonic",
"tonic 0.12.3",
"tower 0.5.3",
"tower-cookies",
"tower-http",
"tower-http 0.6.8",
"tracing",
"tracing-subscriber",
"ulid",
@@ -15866,7 +16329,7 @@ dependencies = [
"tokio-postgres 0.7.13",
"tokio-stream",
"tokio-util",
"tonic",
"tonic 0.12.3",
"tracing",
"tracing-appender",
"tracing-opentelemetry",
@@ -15920,6 +16383,29 @@ dependencies = [
"windmill-common",
]
[[package]]
name = "windmill-local"
version = "0.1.0"
dependencies = [
"anyhow",
"async-recursion",
"axum 0.7.9",
"chrono",
"libsql",
"rquickjs",
"serde",
"serde_json",
"thiserror 1.0.69",
"tokio",
"tokio-test",
"tower 0.4.13",
"tower-http 0.5.2",
"tracing",
"tracing-subscriber",
"uuid",
"windmill-common",
]
[[package]]
name = "windmill-macros"
version = "1.613.4"
@@ -17128,13 +17614,34 @@ dependencies = [
"synstructure 0.13.2",
]
[[package]]
name = "zerocopy"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0"
dependencies = [
"byteorder",
"zerocopy-derive 0.7.35",
]
[[package]]
name = "zerocopy"
version = "0.8.33"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "668f5168d10b9ee831de31933dc111a459c97ec93225beb307aed970d1372dfd"
dependencies = [
"zerocopy-derive",
"zerocopy-derive 0.8.33",
]
[[package]]
name = "zerocopy-derive"
version = "0.7.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.114",
]
[[package]]

View File

@@ -18,6 +18,7 @@ members = [
"./windmill-indexer",
"./windmill-macros",
"./windmill-oauth",
"./windmill-local",
"./parsers/windmill-parser",
"./parsers/windmill-parser-ts",
"./parsers/windmill-parser-go",

View File

@@ -0,0 +1,52 @@
[package]
name = "windmill-local"
version = "0.1.0"
edition = "2021"
description = "Windmill local mode with libSQL/Turso support for preview execution"
[dependencies]
# libSQL - Turso's SQLite fork (used for local and remote Turso connections)
# Note: libsql IS the Turso database driver - Turso is built on libSQL
libsql = "0.9"
# Async runtime
tokio = { version = "1", features = ["full", "sync", "macros"] }
# Serialization
serde = { version = "1", features = ["derive"] }
serde_json = "1"
# Error handling
anyhow = "1"
thiserror = "1"
# UUID for job IDs
uuid = { version = "1", features = ["v4", "serde"] }
# Timestamps
chrono = { version = "0.4", features = ["serde"] }
# Tracing
tracing = "0.1"
# HTTP server
axum = "0.7"
tower = "0.4"
tower-http = { version = "0.5", features = ["cors", "trace"] }
# Windmill types (flows, scripts, etc.)
windmill-common = { path = "../windmill-common", default-features = false }
# For input transforms (JavaScript evaluation)
rquickjs = { version = "0.8", features = ["bindgen", "classes", "loader", "array-buffer", "futures"] }
# For recursive async functions
async-recursion = "1"
[dev-dependencies]
tokio-test = "0.4"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
[[example]]
name = "local_server"
path = "examples/local_server.rs"

View File

@@ -0,0 +1,65 @@
//! Example: Run the Windmill local server
//!
//! This demonstrates running a local Windmill server with libSQL/SQLite backend.
//!
//! Run with:
//! cargo run -p windmill-local --example local_server
//!
//! Test with:
//! # Health check
//! curl http://localhost:8000/health
//!
//! # Run a bash preview (async)
//! curl -X POST http://localhost:8000/api/w/local/jobs/run/preview \
//! -H "Content-Type: application/json" \
//! -d '{"content": "echo Hello World", "language": "bash"}'
//!
//! # Run a bash preview and wait for result
//! curl -X POST http://localhost:8000/api/w/local/jobs/run_wait_result/preview \
//! -H "Content-Type: application/json" \
//! -d '{"content": "echo 42", "language": "bash"}'
//!
//! # Run a Python preview
//! curl -X POST http://localhost:8000/api/w/local/jobs/run_wait_result/preview \
//! -H "Content-Type: application/json" \
//! -d '{"content": "def main(x=1): return x * 2", "language": "python3", "args": {"x": 21}}'
//!
//! # Run a flow preview (linear flow with two steps)
//! curl -X POST http://localhost:8000/api/w/local/jobs/run_wait_result/preview_flow \
//! -H "Content-Type: application/json" \
//! -d '{
//! "value": {
//! "modules": [
//! {"id": "step1", "value": {"type": "rawscript", "language": "bash", "content": "echo 10"}},
//! {"id": "step2", "value": {"type": "identity"}}
//! ]
//! },
//! "args": {}
//! }'
use std::net::SocketAddr;
use windmill_local::LocalServer;
#[tokio::main]
async fn main() {
// Initialize tracing
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::from_default_env()
.add_directive("windmill_local=info".parse().unwrap()),
)
.init();
let addr: SocketAddr = "0.0.0.0:8000".parse().unwrap();
println!("Starting Windmill Local Server on {}", addr);
println!();
println!("Test endpoints:");
println!(" Health: curl http://localhost:8000/health");
println!(" Preview: curl -X POST http://localhost:8000/api/w/local/jobs/run_wait_result/preview \\");
println!(" -H 'Content-Type: application/json' \\");
println!(" -d '{{\"content\": \"echo Hello\", \"language\": \"bash\"}}'");
println!();
let server = LocalServer::new(addr).await.expect("Failed to create server");
server.run().await.expect("Server error");
}

View File

@@ -0,0 +1,161 @@
//! Database connection and initialization for local mode
use libsql::{Builder, Connection, Database};
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::error::Result;
use crate::schema;
/// Local database wrapper
///
/// For local mode, we use a single connection with in-process coordination.
/// This simplifies the implementation since we don't need `FOR UPDATE SKIP LOCKED`.
pub struct LocalDb {
#[allow(dead_code)]
db: Database,
/// Single connection for all operations (simplifies transaction handling)
conn: Arc<Mutex<Connection>>,
}
impl LocalDb {
/// Create an in-memory database (for testing/ephemeral use)
pub async fn in_memory() -> Result<Self> {
let db = Builder::new_local(":memory:").build().await?;
let conn = db.connect()?;
let local_db = Self {
db,
conn: Arc::new(Mutex::new(conn)),
};
local_db.init_schema().await?;
Ok(local_db)
}
/// Create a file-based database
pub async fn file(path: &str) -> Result<Self> {
let db = Builder::new_local(path).build().await?;
let conn = db.connect()?;
let local_db = Self {
db,
conn: Arc::new(Mutex::new(conn)),
};
local_db.init_schema().await?;
Ok(local_db)
}
/// Create a Turso remote database connection
/// This would be used for the multi-writer scenario
pub async fn turso_remote(url: &str, auth_token: &str) -> Result<Self> {
let db = Builder::new_remote(url.to_string(), auth_token.to_string())
.build()
.await?;
let conn = db.connect()?;
let local_db = Self {
db,
conn: Arc::new(Mutex::new(conn)),
};
local_db.init_schema().await?;
Ok(local_db)
}
/// Initialize the schema
async fn init_schema(&self) -> Result<()> {
let conn = self.conn.lock().await;
// Execute schema as multiple statements
conn.execute_batch(schema::SCHEMA).await?;
Ok(())
}
/// Reset the database (drop and recreate all tables)
pub async fn reset(&self) -> Result<()> {
let conn = self.conn.lock().await;
conn.execute_batch(schema::DROP_SCHEMA).await?;
conn.execute_batch(schema::SCHEMA).await?;
Ok(())
}
/// Get a reference to the connection (locked)
pub async fn conn(&self) -> tokio::sync::MutexGuard<'_, Connection> {
self.conn.lock().await
}
/// Execute a simple query that returns no rows
pub async fn execute(&self, sql: &str, params: impl libsql::params::IntoParams) -> Result<u64> {
let conn = self.conn.lock().await;
let rows_affected = conn.execute(sql, params).await?;
Ok(rows_affected)
}
/// Execute a query and return all rows
pub async fn query(
&self,
sql: &str,
params: impl libsql::params::IntoParams,
) -> Result<libsql::Rows> {
let conn = self.conn.lock().await;
let rows = conn.query(sql, params).await?;
Ok(rows)
}
/// Execute a batch of statements (for transactions)
pub async fn execute_batch(&self, sql: &str) -> Result<()> {
let conn = self.conn.lock().await;
conn.execute_batch(sql).await?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_in_memory_db() {
let db = LocalDb::in_memory().await.unwrap();
// Verify tables exist
let rows = db
.query(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name",
(),
)
.await
.unwrap();
let mut tables = Vec::new();
let mut rows = rows;
while let Some(row) = rows.next().await.unwrap() {
let name: String = row.get(0).unwrap();
tables.push(name);
}
assert!(tables.contains(&"v2_job".to_string()));
assert!(tables.contains(&"v2_job_queue".to_string()));
assert!(tables.contains(&"v2_job_completed".to_string()));
}
#[tokio::test]
async fn test_reset_db() {
let db = LocalDb::in_memory().await.unwrap();
// Insert a job
db.execute(
"INSERT INTO v2_job (id, kind) VALUES ('test-uuid', 'preview')",
(),
)
.await
.unwrap();
// Reset
db.reset().await.unwrap();
// Verify job is gone
let mut rows = db
.query("SELECT COUNT(*) FROM v2_job", ())
.await
.unwrap();
let row = rows.next().await.unwrap().unwrap();
let count: i64 = row.get(0).unwrap();
assert_eq!(count, 0);
}
}

View File

@@ -0,0 +1,29 @@
//! Error types for local mode
use thiserror::Error;
#[derive(Error, Debug)]
pub enum LocalError {
#[error("Database error: {0}")]
Database(#[from] libsql::Error),
#[error("Serialization error: {0}")]
Serialization(#[from] serde_json::Error),
#[error("Job not found: {0}")]
JobNotFound(uuid::Uuid),
#[error("Invalid job state: {0}")]
InvalidJobState(String),
#[error("Queue is empty")]
QueueEmpty,
#[error("Execution error: {0}")]
Execution(String),
#[error("Timeout")]
Timeout,
}
pub type Result<T> = std::result::Result<T, LocalError>;

View File

@@ -0,0 +1,330 @@
//! Simple script executor for local mode
//!
//! This is a minimal executor that supports a few languages for demonstration.
//! A full implementation would integrate with windmill-worker's execution logic.
use std::process::Stdio;
use tokio::process::Command;
use tokio::io::AsyncReadExt;
use crate::error::{LocalError, Result};
use crate::jobs::ScriptLang;
/// Result of script execution
#[derive(Debug)]
pub struct ExecutionResult {
pub success: bool,
pub result: serde_json::Value,
pub logs: String,
}
/// Execute a script with the given language and arguments
pub async fn execute_script(
language: ScriptLang,
code: &str,
args: &serde_json::Value,
) -> Result<ExecutionResult> {
match language {
ScriptLang::Bash => execute_bash(code, args).await,
ScriptLang::Python3 => execute_python(code, args).await,
ScriptLang::Deno => execute_deno(code, args).await,
ScriptLang::Bun => execute_bun(code, args).await,
_ => Ok(ExecutionResult {
success: false,
result: serde_json::json!({
"error": format!("Language {:?} not supported in local mode yet", language)
}),
logs: format!("Language {:?} not supported in local mode", language),
}),
}
}
/// Execute a bash script
async fn execute_bash(code: &str, args: &serde_json::Value) -> Result<ExecutionResult> {
// Create environment variables from args
let mut env_vars = Vec::new();
if let serde_json::Value::Object(map) = args {
for (key, value) in map {
let val_str = match value {
serde_json::Value::String(s) => s.clone(),
_ => value.to_string(),
};
env_vars.push((key.to_uppercase(), val_str));
}
}
let mut child = Command::new("bash")
.arg("-c")
.arg(code)
.envs(env_vars)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| LocalError::Execution(format!("Failed to spawn bash: {}", e)))?;
let status = child
.wait()
.await
.map_err(|e| LocalError::Execution(format!("Failed to wait for bash: {}", e)))?;
let mut stdout = String::new();
let mut stderr = String::new();
if let Some(mut out) = child.stdout.take() {
out.read_to_string(&mut stdout).await.ok();
}
if let Some(mut err) = child.stderr.take() {
err.read_to_string(&mut stderr).await.ok();
}
let logs = format!("{}{}", stdout, stderr);
let success = status.success();
// Try to parse stdout as JSON, otherwise use as string
let result = if success {
let trimmed = stdout.trim();
serde_json::from_str(trimmed).unwrap_or_else(|_| serde_json::json!(trimmed))
} else {
serde_json::json!({ "error": stderr.trim() })
};
Ok(ExecutionResult {
success,
result,
logs,
})
}
/// Execute a Python script
async fn execute_python(code: &str, args: &serde_json::Value) -> Result<ExecutionResult> {
// Wrap the code to handle args and return JSON result
let wrapped_code = format!(
r#"
import json
import sys
# Args passed as JSON
args = json.loads('''{}''')
# User code
{}
# Call main if it exists
if 'main' in dir():
result = main(**args)
print(json.dumps(result))
"#,
serde_json::to_string(args)?,
code
);
let mut child = Command::new("python3")
.arg("-c")
.arg(&wrapped_code)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| LocalError::Execution(format!("Failed to spawn python3: {}", e)))?;
let status = child
.wait()
.await
.map_err(|e| LocalError::Execution(format!("Failed to wait for python3: {}", e)))?;
let mut stdout = String::new();
let mut stderr = String::new();
if let Some(mut out) = child.stdout.take() {
out.read_to_string(&mut stdout).await.ok();
}
if let Some(mut err) = child.stderr.take() {
err.read_to_string(&mut stderr).await.ok();
}
let logs = format!("{}{}", stdout, stderr);
let success = status.success();
let result = if success {
let trimmed = stdout.trim();
// Get the last line as result (in case there's debug output)
let last_line = trimmed.lines().last().unwrap_or("");
serde_json::from_str(last_line).unwrap_or_else(|_| serde_json::json!(trimmed))
} else {
serde_json::json!({ "error": stderr.trim() })
};
Ok(ExecutionResult {
success,
result,
logs,
})
}
/// Execute a Deno/TypeScript script
async fn execute_deno(code: &str, args: &serde_json::Value) -> Result<ExecutionResult> {
// Wrap the code to handle args and return JSON result
let wrapped_code = format!(
r#"
const args = {};
{}
// Call main if it exists
if (typeof main === 'function') {{
const result = await main(args);
console.log(JSON.stringify(result));
}}
"#,
serde_json::to_string(args)?,
code
);
let mut child = Command::new("deno")
.arg("eval")
.arg("--unstable")
.arg(&wrapped_code)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| LocalError::Execution(format!("Failed to spawn deno: {}", e)))?;
let status = child
.wait()
.await
.map_err(|e| LocalError::Execution(format!("Failed to wait for deno: {}", e)))?;
let mut stdout = String::new();
let mut stderr = String::new();
if let Some(mut out) = child.stdout.take() {
out.read_to_string(&mut stdout).await.ok();
}
if let Some(mut err) = child.stderr.take() {
err.read_to_string(&mut stderr).await.ok();
}
let logs = format!("{}{}", stdout, stderr);
let success = status.success();
let result = if success {
let trimmed = stdout.trim();
let last_line = trimmed.lines().last().unwrap_or("");
serde_json::from_str(last_line).unwrap_or_else(|_| serde_json::json!(trimmed))
} else {
serde_json::json!({ "error": stderr.trim() })
};
Ok(ExecutionResult {
success,
result,
logs,
})
}
/// Execute a Bun/TypeScript script
async fn execute_bun(code: &str, args: &serde_json::Value) -> Result<ExecutionResult> {
// Similar to Deno but using Bun
let wrapped_code = format!(
r#"
const args = {};
{}
// Call main if it exists
if (typeof main === 'function') {{
const result = await main(args);
console.log(JSON.stringify(result));
}}
"#,
serde_json::to_string(args)?,
code
);
let mut child = Command::new("bun")
.arg("eval")
.arg(&wrapped_code)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| LocalError::Execution(format!("Failed to spawn bun: {}", e)))?;
let status = child
.wait()
.await
.map_err(|e| LocalError::Execution(format!("Failed to wait for bun: {}", e)))?;
let mut stdout = String::new();
let mut stderr = String::new();
if let Some(mut out) = child.stdout.take() {
out.read_to_string(&mut stdout).await.ok();
}
if let Some(mut err) = child.stderr.take() {
err.read_to_string(&mut stderr).await.ok();
}
let logs = format!("{}{}", stdout, stderr);
let success = status.success();
let result = if success {
let trimmed = stdout.trim();
let last_line = trimmed.lines().last().unwrap_or("");
serde_json::from_str(last_line).unwrap_or_else(|_| serde_json::json!(trimmed))
} else {
serde_json::json!({ "error": stderr.trim() })
};
Ok(ExecutionResult {
success,
result,
logs,
})
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_bash_execution() {
let result = execute_script(
ScriptLang::Bash,
"echo 42",
&serde_json::json!({}),
)
.await
.unwrap();
assert!(result.success);
// Output "42" is parsed as JSON number
assert_eq!(result.result, serde_json::json!(42));
}
#[tokio::test]
async fn test_bash_with_args() {
let result = execute_script(
ScriptLang::Bash,
"echo $NAME",
&serde_json::json!({"name": "world"}),
)
.await
.unwrap();
assert!(result.success);
assert_eq!(result.result, serde_json::json!("world"));
}
#[tokio::test]
async fn test_bash_json_output() {
let result = execute_script(
ScriptLang::Bash,
r#"echo '{"key": "value"}'"#,
&serde_json::json!({}),
)
.await
.unwrap();
assert!(result.success);
assert_eq!(result.result, serde_json::json!({"key": "value"}));
}
}

View File

@@ -0,0 +1,750 @@
//! Flow executor for local mode
//!
//! This module implements flow execution using the real Windmill flow types
//! from windmill-common, but with libSQL as the backend.
use std::collections::HashMap;
use chrono::Utc;
use serde::{Deserialize, Serialize};
use serde_json::value::RawValue;
use uuid::Uuid;
use windmill_common::flows::{
Branch, FlowModule, FlowModuleValue, FlowValue, InputTransform,
};
use windmill_common::scripts::ScriptLang as WmScriptLang;
use crate::db::LocalDb;
use crate::error::{LocalError, Result};
use crate::executor::{execute_script, ExecutionResult};
use crate::jobs::ScriptLang;
/// Flow execution state
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowStatus {
pub step: usize,
pub modules: Vec<ModuleStatus>,
#[serde(skip_serializing_if = "Option::is_none")]
pub failure_module: Option<FailureModule>,
#[serde(skip_serializing_if = "Option::is_none")]
pub retry: Option<RetryStatus>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ModuleStatus {
pub id: String,
#[serde(rename = "type")]
pub status_type: ModuleStatusType,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub iterator: Option<IteratorStatus>,
#[serde(skip_serializing_if = "Option::is_none")]
pub branch_chosen: Option<BranchChosen>,
#[serde(skip_serializing_if = "Option::is_none")]
pub branchall: Option<BranchAllStatus>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub enum ModuleStatusType {
WaitingForPriorSteps,
WaitingForEvents,
WaitingForExecutor,
InProgress,
Success,
Failure,
Skipped,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IteratorStatus {
pub index: usize,
pub itered: Vec<serde_json::Value>,
pub args: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BranchChosen {
#[serde(rename = "type")]
pub branch_type: String,
pub branch: Option<usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BranchAllStatus {
pub branch: usize,
pub len: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FailureModule {
pub id: String,
pub error: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RetryStatus {
pub fail_count: u32,
}
/// Context passed through flow execution
#[derive(Debug, Clone)]
pub struct FlowContext {
pub flow_input: serde_json::Value,
pub previous_result: serde_json::Value,
pub results_by_id: HashMap<String, serde_json::Value>,
}
impl FlowContext {
pub fn new(flow_input: serde_json::Value) -> Self {
Self {
flow_input: flow_input.clone(),
previous_result: flow_input,
results_by_id: HashMap::new(),
}
}
/// Get the evaluation context for input transforms
pub fn to_eval_context(&self) -> serde_json::Value {
serde_json::json!({
"flow_input": self.flow_input,
"previous_result": self.previous_result,
"results": self.results_by_id,
})
}
}
/// Execute a flow and return the result
pub async fn execute_flow(
db: &LocalDb,
flow_value: &FlowValue,
flow_input: serde_json::Value,
) -> Result<(serde_json::Value, FlowStatus)> {
let mut ctx = FlowContext::new(flow_input);
let mut status = FlowStatus {
step: 0,
modules: Vec::new(),
failure_module: None,
retry: None,
};
// Execute modules sequentially
for (idx, module) in flow_value.modules.iter().enumerate() {
status.step = idx;
let module_status = ModuleStatus {
id: module.id.clone(),
status_type: ModuleStatusType::InProgress,
result: None,
iterator: None,
branch_chosen: None,
branchall: None,
};
status.modules.push(module_status);
tracing::info!("Executing flow module {}: {}", idx, module.id);
match execute_module(db, module, &mut ctx).await {
Ok(result) => {
// Update context with result
ctx.results_by_id.insert(module.id.clone(), result.clone());
ctx.previous_result = result.clone();
// Update status
if let Some(ms) = status.modules.last_mut() {
ms.status_type = ModuleStatusType::Success;
ms.result = Some(result);
}
}
Err(e) => {
// Module failed
tracing::error!("Flow module {} failed: {}", module.id, e);
if let Some(ms) = status.modules.last_mut() {
ms.status_type = ModuleStatusType::Failure;
ms.result = Some(serde_json::json!({"error": e.to_string()}));
}
status.failure_module = Some(FailureModule {
id: module.id.clone(),
error: e.to_string(),
});
// Check if continue_on_error is set
if !module.continue_on_error.unwrap_or(false) {
return Ok((serde_json::json!({"error": e.to_string()}), status));
}
}
}
}
Ok((ctx.previous_result, status))
}
/// Execute a single flow module
#[async_recursion::async_recursion]
async fn execute_module(
db: &LocalDb,
module: &FlowModule,
ctx: &mut FlowContext,
) -> Result<serde_json::Value> {
// Check skip_if condition
if let Some(skip_if) = &module.skip_if {
let should_skip = evaluate_expr(&skip_if.expr, &ctx.to_eval_context())?;
if should_skip.as_bool().unwrap_or(false) {
tracing::info!("Skipping module {} due to skip_if condition", module.id);
return Ok(ctx.previous_result.clone());
}
}
// Parse the module value
let module_value: FlowModuleValue = serde_json::from_str(module.value.get())
.map_err(|e| LocalError::Execution(format!("Failed to parse module value: {}", e)))?;
match module_value {
FlowModuleValue::Identity => {
Ok(ctx.previous_result.clone())
}
FlowModuleValue::RawScript { content, language, input_transforms, .. } => {
let args = resolve_input_transforms(&input_transforms, ctx)?;
let lang = convert_script_lang(&language);
let result = execute_script(lang, &content, &args).await?;
if result.success {
Ok(result.result)
} else {
Err(LocalError::Execution(result.result.to_string()))
}
}
FlowModuleValue::Script { path, input_transforms, .. } => {
// In local mode, we don't have access to saved scripts
Err(LocalError::Execution(format!(
"Script references (path: {}) are not supported in local mode. Use rawscript instead.",
path
)))
}
FlowModuleValue::Flow { path, .. } => {
// In local mode, we don't have access to saved flows
Err(LocalError::Execution(format!(
"Flow references (path: {}) are not supported in local mode. Use inline modules instead.",
path
)))
}
FlowModuleValue::ForloopFlow { iterator, modules, skip_failures, parallel, .. } => {
execute_forloop(db, &iterator, &modules, skip_failures, parallel, ctx).await
}
FlowModuleValue::WhileloopFlow { modules, skip_failures, .. } => {
execute_whileloop(db, &modules, skip_failures, ctx).await
}
FlowModuleValue::BranchOne { branches, default, .. } => {
execute_branch_one(db, &branches, &default, ctx).await
}
FlowModuleValue::BranchAll { branches, parallel } => {
execute_branch_all(db, &branches, parallel, ctx).await
}
FlowModuleValue::FlowScript { .. } => {
Err(LocalError::Execution(
"FlowScript (internal reference) is not supported in local mode".to_string()
))
}
FlowModuleValue::AIAgent { .. } => {
Err(LocalError::Execution(
"AIAgent is not supported in local mode".to_string()
))
}
}
}
/// Execute a for loop
#[async_recursion::async_recursion]
async fn execute_forloop(
db: &LocalDb,
iterator: &InputTransform,
modules: &[FlowModule],
skip_failures: bool,
_parallel: bool, // TODO: implement parallel execution
ctx: &mut FlowContext,
) -> Result<serde_json::Value> {
// Evaluate the iterator expression
let iter_value = evaluate_input_transform(iterator, ctx)?;
let items = match iter_value.as_array() {
Some(arr) => arr.clone(),
None => {
return Err(LocalError::Execution(
"For loop iterator must evaluate to an array".to_string()
));
}
};
let mut results = Vec::new();
for (idx, item) in items.iter().enumerate() {
tracing::debug!("For loop iteration {} of {}", idx + 1, items.len());
// Create iteration context
let mut iter_ctx = FlowContext {
flow_input: ctx.flow_input.clone(),
previous_result: item.clone(),
results_by_id: ctx.results_by_id.clone(),
};
// Add iter context
iter_ctx.results_by_id.insert("iter".to_string(), serde_json::json!({
"index": idx,
"value": item,
}));
// Execute modules in sequence
let mut iter_result = item.clone();
let mut had_error = false;
for module in modules {
match execute_module(db, module, &mut iter_ctx).await {
Ok(result) => {
iter_ctx.results_by_id.insert(module.id.clone(), result.clone());
iter_ctx.previous_result = result.clone();
iter_result = result;
}
Err(e) => {
had_error = true;
if skip_failures {
tracing::warn!("For loop iteration {} failed (skipping): {}", idx, e);
iter_result = serde_json::json!({"error": e.to_string()});
} else {
return Err(e);
}
break;
}
}
}
if !had_error || skip_failures {
results.push(iter_result);
}
}
Ok(serde_json::Value::Array(results))
}
/// Execute a while loop
#[async_recursion::async_recursion]
async fn execute_whileloop(
db: &LocalDb,
modules: &[FlowModule],
skip_failures: bool,
ctx: &mut FlowContext,
) -> Result<serde_json::Value> {
const MAX_ITERATIONS: usize = 1000;
let mut results = Vec::new();
let mut iteration = 0;
loop {
if iteration >= MAX_ITERATIONS {
return Err(LocalError::Execution(format!(
"While loop exceeded maximum iterations ({})", MAX_ITERATIONS
)));
}
// Execute modules
let mut iter_result = ctx.previous_result.clone();
let mut should_continue = true;
for module in modules {
match execute_module(db, module, ctx).await {
Ok(result) => {
ctx.results_by_id.insert(module.id.clone(), result.clone());
ctx.previous_result = result.clone();
iter_result = result;
}
Err(e) => {
if skip_failures {
tracing::warn!("While loop iteration {} failed (skipping): {}", iteration, e);
iter_result = serde_json::json!({"error": e.to_string()});
} else {
return Err(e);
}
should_continue = false;
break;
}
}
}
results.push(iter_result);
iteration += 1;
// Check stop condition (result should be truthy to continue)
if !should_continue {
break;
}
// Check if the result indicates we should stop
let continue_loop = match &ctx.previous_result {
serde_json::Value::Bool(b) => *b,
serde_json::Value::Null => false,
serde_json::Value::Object(obj) => {
// Check for a "continue" field
obj.get("continue")
.and_then(|v| v.as_bool())
.unwrap_or(false)
}
_ => false,
};
if !continue_loop {
break;
}
}
Ok(serde_json::Value::Array(results))
}
/// Execute branch-one (if/else)
#[async_recursion::async_recursion]
async fn execute_branch_one(
db: &LocalDb,
branches: &[Branch],
default: &[FlowModule],
ctx: &mut FlowContext,
) -> Result<serde_json::Value> {
// Find the first matching branch
for (idx, branch) in branches.iter().enumerate() {
let condition = evaluate_expr(&branch.expr, &ctx.to_eval_context())?;
if condition.as_bool().unwrap_or(false) {
tracing::debug!("Branch {} matched", idx);
return execute_branch_modules(db, &branch.modules, ctx).await;
}
}
// No branch matched, execute default
tracing::debug!("No branch matched, executing default");
execute_branch_modules(db, default, ctx).await
}
/// Execute branch-all (parallel branches)
#[async_recursion::async_recursion]
async fn execute_branch_all(
db: &LocalDb,
branches: &[Branch],
_parallel: bool, // TODO: implement true parallel execution
ctx: &mut FlowContext,
) -> Result<serde_json::Value> {
let mut results = Vec::new();
// Execute all branches (sequentially for now)
for (idx, branch) in branches.iter().enumerate() {
tracing::debug!("Executing branch {}", idx);
let mut branch_ctx = ctx.clone();
match execute_branch_modules(db, &branch.modules, &mut branch_ctx).await {
Ok(result) => {
results.push(result);
}
Err(e) => {
if branch.skip_failure {
tracing::warn!("Branch {} failed (skipping): {}", idx, e);
results.push(serde_json::json!({"error": e.to_string()}));
} else {
return Err(e);
}
}
}
}
Ok(serde_json::Value::Array(results))
}
/// Execute a sequence of modules in a branch
#[async_recursion::async_recursion]
async fn execute_branch_modules(
db: &LocalDb,
modules: &[FlowModule],
ctx: &mut FlowContext,
) -> Result<serde_json::Value> {
let mut result = ctx.previous_result.clone();
for module in modules {
result = execute_module(db, module, ctx).await?;
ctx.results_by_id.insert(module.id.clone(), result.clone());
ctx.previous_result = result.clone();
}
Ok(result)
}
/// Resolve input transforms to concrete arguments
fn resolve_input_transforms(
transforms: &HashMap<String, InputTransform>,
ctx: &FlowContext,
) -> Result<serde_json::Value> {
let mut args = serde_json::Map::new();
for (key, transform) in transforms {
let value = evaluate_input_transform(transform, ctx)?;
args.insert(key.clone(), value);
}
Ok(serde_json::Value::Object(args))
}
/// Evaluate an input transform
fn evaluate_input_transform(
transform: &InputTransform,
ctx: &FlowContext,
) -> Result<serde_json::Value> {
match transform {
InputTransform::Static { value } => {
serde_json::from_str(value.get())
.map_err(|e| LocalError::Execution(format!("Invalid static value: {}", e)))
}
InputTransform::Javascript { expr } => {
evaluate_expr(expr, &ctx.to_eval_context())
}
InputTransform::Ai => {
Err(LocalError::Execution("AI input transforms are not supported in local mode".to_string()))
}
}
}
/// Evaluate a JavaScript expression
fn evaluate_expr(expr: &str, context: &serde_json::Value) -> Result<serde_json::Value> {
let expr = expr.trim();
// Handle comparison operators
if let Some(result) = try_evaluate_comparison(expr, context) {
return Ok(result);
}
// Handle simple variable references
if let Some(val) = resolve_path(expr, context) {
return Ok(val);
}
// Handle boolean literals
if expr == "true" {
return Ok(serde_json::Value::Bool(true));
}
if expr == "false" {
return Ok(serde_json::Value::Bool(false));
}
// Handle numeric literals
if let Ok(n) = expr.parse::<i64>() {
return Ok(serde_json::json!(n));
}
if let Ok(n) = expr.parse::<f64>() {
return Ok(serde_json::json!(n));
}
// Handle string literals
if (expr.starts_with('"') && expr.ends_with('"')) ||
(expr.starts_with('\'') && expr.ends_with('\'')) {
return Ok(serde_json::json!(&expr[1..expr.len()-1]));
}
// For complex expressions, we'd need a full JS runtime
tracing::warn!("Complex expression not evaluated: {}", expr);
Ok(serde_json::json!(expr))
}
/// Try to resolve a path like "flow_input.x" or "results.a.b"
fn resolve_path(path: &str, context: &serde_json::Value) -> Option<serde_json::Value> {
let parts: Vec<&str> = path.split('.').collect();
if parts.is_empty() {
return None;
}
let mut current = context.get(parts[0])?;
for part in &parts[1..] {
current = current.get(*part)?;
}
Some(current.clone())
}
/// Try to evaluate a comparison expression
fn try_evaluate_comparison(expr: &str, context: &serde_json::Value) -> Option<serde_json::Value> {
// Supported operators: >, <, >=, <=, ==, !=, ===, !==
let operators = ["===", "!==", ">=", "<=", "==", "!=", ">", "<"];
for op in operators {
if let Some(pos) = expr.find(op) {
let left = expr[..pos].trim();
let right = expr[pos + op.len()..].trim();
let left_val = evaluate_expr(left, context).ok()?;
let right_val = evaluate_expr(right, context).ok()?;
let result = match op {
">" => compare_values(&left_val, &right_val, |a, b| a > b),
"<" => compare_values(&left_val, &right_val, |a, b| a < b),
">=" => compare_values(&left_val, &right_val, |a, b| a >= b),
"<=" => compare_values(&left_val, &right_val, |a, b| a <= b),
"==" | "===" => Some(left_val == right_val),
"!=" | "!==" => Some(left_val != right_val),
_ => None,
};
return result.map(serde_json::Value::Bool);
}
}
// Try logical operators
if let Some(pos) = expr.find("&&") {
let left = expr[..pos].trim();
let right = expr[pos + 2..].trim();
let left_val = evaluate_expr(left, context).ok()?;
let right_val = evaluate_expr(right, context).ok()?;
return Some(serde_json::Value::Bool(
is_truthy(&left_val) && is_truthy(&right_val)
));
}
if let Some(pos) = expr.find("||") {
let left = expr[..pos].trim();
let right = expr[pos + 2..].trim();
let left_val = evaluate_expr(left, context).ok()?;
let right_val = evaluate_expr(right, context).ok()?;
return Some(serde_json::Value::Bool(
is_truthy(&left_val) || is_truthy(&right_val)
));
}
None
}
/// Compare two JSON values numerically
fn compare_values<F>(left: &serde_json::Value, right: &serde_json::Value, cmp: F) -> Option<bool>
where
F: Fn(f64, f64) -> bool,
{
let left_num = value_to_number(left)?;
let right_num = value_to_number(right)?;
Some(cmp(left_num, right_num))
}
/// Convert a JSON value to a number
fn value_to_number(val: &serde_json::Value) -> Option<f64> {
match val {
serde_json::Value::Number(n) => n.as_f64(),
serde_json::Value::String(s) => s.parse().ok(),
serde_json::Value::Bool(b) => Some(if *b { 1.0 } else { 0.0 }),
_ => None,
}
}
/// Check if a value is truthy (JavaScript semantics)
fn is_truthy(val: &serde_json::Value) -> bool {
match val {
serde_json::Value::Null => false,
serde_json::Value::Bool(b) => *b,
serde_json::Value::Number(n) => n.as_f64().map(|f| f != 0.0).unwrap_or(false),
serde_json::Value::String(s) => !s.is_empty(),
serde_json::Value::Array(a) => !a.is_empty(),
serde_json::Value::Object(_) => true,
}
}
/// Convert windmill-common ScriptLang to local ScriptLang
fn convert_script_lang(lang: &WmScriptLang) -> ScriptLang {
match lang {
WmScriptLang::Deno => ScriptLang::Deno,
WmScriptLang::Python3 => ScriptLang::Python3,
WmScriptLang::Bash => ScriptLang::Bash,
WmScriptLang::Go => ScriptLang::Go,
WmScriptLang::Bun => ScriptLang::Bun,
WmScriptLang::Nativets => ScriptLang::Deno, // Fallback to Deno
_ => ScriptLang::Bash, // Default fallback
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_evaluate_simple_expr() {
let ctx = serde_json::json!({
"flow_input": {"x": 10},
"previous_result": 42,
"results": {"a": "hello"},
});
assert_eq!(
evaluate_expr("flow_input", &ctx).unwrap(),
serde_json::json!({"x": 10})
);
assert_eq!(
evaluate_expr("previous_result", &ctx).unwrap(),
serde_json::json!(42)
);
assert_eq!(
evaluate_expr("results.a", &ctx).unwrap(),
serde_json::json!("hello")
);
assert_eq!(
evaluate_expr("flow_input.x", &ctx).unwrap(),
serde_json::json!(10)
);
}
#[test]
fn test_evaluate_comparison_expr() {
let ctx = serde_json::json!({
"flow_input": {"x": 10, "y": 5},
"previous_result": 42,
});
assert_eq!(
evaluate_expr("flow_input.x > 5", &ctx).unwrap(),
serde_json::Value::Bool(true)
);
assert_eq!(
evaluate_expr("flow_input.x < 5", &ctx).unwrap(),
serde_json::Value::Bool(false)
);
assert_eq!(
evaluate_expr("flow_input.x >= 10", &ctx).unwrap(),
serde_json::Value::Bool(true)
);
assert_eq!(
evaluate_expr("flow_input.y == 5", &ctx).unwrap(),
serde_json::Value::Bool(true)
);
assert_eq!(
evaluate_expr("previous_result > 40", &ctx).unwrap(),
serde_json::Value::Bool(true)
);
}
#[test]
fn test_evaluate_logical_expr() {
let ctx = serde_json::json!({
"flow_input": {"a": true, "b": false},
});
// Simple boolean logic (complex expressions with mixed operators need parentheses support)
assert_eq!(
evaluate_expr("flow_input.a && flow_input.a", &ctx).unwrap(),
serde_json::Value::Bool(true)
);
assert_eq!(
evaluate_expr("flow_input.a && flow_input.b", &ctx).unwrap(),
serde_json::Value::Bool(false)
);
assert_eq!(
evaluate_expr("flow_input.b || flow_input.a", &ctx).unwrap(),
serde_json::Value::Bool(true)
);
}
}

View File

@@ -0,0 +1,495 @@
//! Job types and operations for local mode
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::db::LocalDb;
use crate::error::{LocalError, Result};
/// Job kind (mirrors windmill-common JobKind)
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum JobKind {
Script,
Preview,
Flow,
FlowPreview,
Dependencies,
FlowDependencies,
ScriptHub,
Identity,
Http,
Graphql,
Postgresql,
Noop,
AppDependencies,
DeploymentCallback,
SingleScriptFlow,
FlowScript,
FlowNode,
AppScript,
}
impl JobKind {
pub fn as_str(&self) -> &'static str {
match self {
JobKind::Script => "script",
JobKind::Preview => "preview",
JobKind::Flow => "flow",
JobKind::FlowPreview => "flowpreview",
JobKind::Dependencies => "dependencies",
JobKind::FlowDependencies => "flowdependencies",
JobKind::ScriptHub => "script_hub",
JobKind::Identity => "identity",
JobKind::Http => "http",
JobKind::Graphql => "graphql",
JobKind::Postgresql => "postgresql",
JobKind::Noop => "noop",
JobKind::AppDependencies => "appdependencies",
JobKind::DeploymentCallback => "deploymentcallback",
JobKind::SingleScriptFlow => "singlescriptflow",
JobKind::FlowScript => "flowscript",
JobKind::FlowNode => "flownode",
JobKind::AppScript => "appscript",
}
}
}
/// Script language
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum ScriptLang {
Python3,
Deno,
Go,
Bash,
Postgresql,
Nativets,
Bun,
Mysql,
Bigquery,
Snowflake,
Graphql,
Powershell,
Mssql,
Php,
Bunnative,
Rust,
Ansible,
Csharp,
Oracledb,
Nu,
Java,
Duckdb,
}
impl ScriptLang {
pub fn as_str(&self) -> &'static str {
match self {
ScriptLang::Python3 => "python3",
ScriptLang::Deno => "deno",
ScriptLang::Go => "go",
ScriptLang::Bash => "bash",
ScriptLang::Postgresql => "postgresql",
ScriptLang::Nativets => "nativets",
ScriptLang::Bun => "bun",
ScriptLang::Mysql => "mysql",
ScriptLang::Bigquery => "bigquery",
ScriptLang::Snowflake => "snowflake",
ScriptLang::Graphql => "graphql",
ScriptLang::Powershell => "powershell",
ScriptLang::Mssql => "mssql",
ScriptLang::Php => "php",
ScriptLang::Bunnative => "bunnative",
ScriptLang::Rust => "rust",
ScriptLang::Ansible => "ansible",
ScriptLang::Csharp => "csharp",
ScriptLang::Oracledb => "oracledb",
ScriptLang::Nu => "nu",
ScriptLang::Java => "java",
ScriptLang::Duckdb => "duckdb",
}
}
pub fn from_str(s: &str) -> Option<Self> {
match s {
"python3" => Some(ScriptLang::Python3),
"deno" => Some(ScriptLang::Deno),
"go" => Some(ScriptLang::Go),
"bash" => Some(ScriptLang::Bash),
"postgresql" => Some(ScriptLang::Postgresql),
"nativets" => Some(ScriptLang::Nativets),
"bun" => Some(ScriptLang::Bun),
"mysql" => Some(ScriptLang::Mysql),
"bigquery" => Some(ScriptLang::Bigquery),
"snowflake" => Some(ScriptLang::Snowflake),
"graphql" => Some(ScriptLang::Graphql),
"powershell" => Some(ScriptLang::Powershell),
"mssql" => Some(ScriptLang::Mssql),
"php" => Some(ScriptLang::Php),
"bunnative" => Some(ScriptLang::Bunnative),
"rust" => Some(ScriptLang::Rust),
"ansible" => Some(ScriptLang::Ansible),
"csharp" => Some(ScriptLang::Csharp),
"oracledb" => Some(ScriptLang::Oracledb),
"nu" => Some(ScriptLang::Nu),
"java" => Some(ScriptLang::Java),
"duckdb" => Some(ScriptLang::Duckdb),
_ => None,
}
}
}
/// Job status for completed jobs
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum JobStatus {
Success,
Failure,
Canceled,
Skipped,
}
impl JobStatus {
pub fn as_str(&self) -> &'static str {
match self {
JobStatus::Success => "success",
JobStatus::Failure => "failure",
JobStatus::Canceled => "canceled",
JobStatus::Skipped => "skipped",
}
}
pub fn from_str(s: &str) -> Option<Self> {
match s {
"success" => Some(JobStatus::Success),
"failure" => Some(JobStatus::Failure),
"canceled" => Some(JobStatus::Canceled),
"skipped" => Some(JobStatus::Skipped),
_ => None,
}
}
}
/// Preview job request (simplified from windmill-api Preview struct)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PreviewRequest {
pub content: String,
pub language: ScriptLang,
#[serde(default)]
pub args: serde_json::Value,
pub lock: Option<String>,
pub tag: Option<String>,
}
/// Flow preview request
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlowPreviewRequest {
pub value: serde_json::Value, // FlowValue as JSON
#[serde(default)]
pub args: serde_json::Value,
pub tag: Option<String>,
}
/// A queued job (combines v2_job and v2_job_queue)
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueuedJob {
pub id: Uuid,
pub workspace_id: String,
pub kind: JobKind,
pub script_lang: Option<ScriptLang>,
pub raw_code: Option<String>,
pub raw_lock: Option<String>,
pub raw_flow: Option<serde_json::Value>,
pub args: serde_json::Value,
pub tag: String,
pub created_at: DateTime<Utc>,
pub scheduled_for: DateTime<Utc>,
pub running: bool,
pub parent_job: Option<Uuid>,
pub root_job: Option<Uuid>,
pub flow_step_id: Option<String>,
}
/// A completed job
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompletedJob {
pub id: Uuid,
pub workspace_id: String,
pub status: JobStatus,
pub result: serde_json::Value,
pub started_at: Option<DateTime<Utc>>,
pub completed_at: DateTime<Utc>,
pub duration_ms: Option<i64>,
}
/// Push a preview job to the queue
pub async fn push_preview(db: &LocalDb, req: PreviewRequest) -> Result<Uuid> {
let id = Uuid::new_v4();
let now = Utc::now().to_rfc3339();
let args_json = serde_json::to_string(&req.args)?;
let tag = req.tag.as_deref().unwrap_or("deno");
// Insert into v2_job
db.execute(
r#"
INSERT INTO v2_job (id, kind, script_lang, raw_code, raw_lock, args, tag, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
"#,
libsql::params![
id.to_string(),
JobKind::Preview.as_str(),
req.language.as_str(),
req.content,
req.lock,
args_json,
tag,
now.clone(),
],
)
.await?;
// Insert into v2_job_queue
db.execute(
r#"
INSERT INTO v2_job_queue (id, tag, created_at, scheduled_for, running)
VALUES (?1, ?2, ?3, ?4, 0)
"#,
libsql::params![id.to_string(), tag, now.clone(), now],
)
.await?;
// Insert into v2_job_runtime (for heartbeat tracking)
db.execute(
"INSERT INTO v2_job_runtime (id) VALUES (?1)",
libsql::params![id.to_string()],
)
.await?;
// Insert into job_perms (simplified)
db.execute(
"INSERT INTO job_perms (job_id) VALUES (?1)",
libsql::params![id.to_string()],
)
.await?;
tracing::info!("Pushed preview job: {}", id);
Ok(id)
}
/// Push a flow preview job to the queue
pub async fn push_flow_preview(db: &LocalDb, req: FlowPreviewRequest) -> Result<Uuid> {
let id = Uuid::new_v4();
let now = Utc::now().to_rfc3339();
let args_json = serde_json::to_string(&req.args)?;
let flow_json = serde_json::to_string(&req.value)?;
let tag = req.tag.as_deref().unwrap_or("flow");
// Insert into v2_job
db.execute(
r#"
INSERT INTO v2_job (id, kind, raw_flow, args, tag, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)
"#,
libsql::params![
id.to_string(),
JobKind::FlowPreview.as_str(),
flow_json,
args_json,
tag,
now.clone(),
],
)
.await?;
// Insert into v2_job_queue
db.execute(
r#"
INSERT INTO v2_job_queue (id, tag, created_at, scheduled_for, running)
VALUES (?1, ?2, ?3, ?4, 0)
"#,
libsql::params![id.to_string(), tag, now.clone(), now],
)
.await?;
// Insert into v2_job_runtime
db.execute(
"INSERT INTO v2_job_runtime (id) VALUES (?1)",
libsql::params![id.to_string()],
)
.await?;
// Insert into job_perms
db.execute(
"INSERT INTO job_perms (job_id) VALUES (?1)",
libsql::params![id.to_string()],
)
.await?;
// Insert initial flow status
db.execute(
"INSERT INTO v2_job_status (id, flow_status) VALUES (?1, '{}')",
libsql::params![id.to_string()],
)
.await?;
tracing::info!("Pushed flow preview job: {}", id);
Ok(id)
}
/// Get a completed job result (for polling)
pub async fn get_completed_job(db: &LocalDb, id: Uuid) -> Result<Option<CompletedJob>> {
let mut rows = db
.query(
r#"
SELECT id, workspace_id, status, result, started_at, completed_at, duration_ms
FROM v2_job_completed
WHERE id = ?1
"#,
libsql::params![id.to_string()],
)
.await?;
if let Some(row) = rows.next().await? {
let status_str: String = row.get(2)?;
let status = JobStatus::from_str(&status_str)
.ok_or_else(|| LocalError::InvalidJobState(status_str))?;
let result_str: Option<String> = row.get(3)?;
let result: serde_json::Value = result_str
.map(|s| serde_json::from_str(&s))
.transpose()?
.unwrap_or(serde_json::Value::Null);
let started_at_str: Option<String> = row.get(4)?;
let started_at = started_at_str
.map(|s| DateTime::parse_from_rfc3339(&s).map(|dt| dt.with_timezone(&Utc)))
.transpose()
.ok()
.flatten();
let completed_at_str: String = row.get(5)?;
let completed_at = DateTime::parse_from_rfc3339(&completed_at_str)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now());
let duration_ms: Option<i64> = row.get(6)?;
Ok(Some(CompletedJob {
id,
workspace_id: row.get(1)?,
status,
result,
started_at,
completed_at,
duration_ms,
}))
} else {
Ok(None)
}
}
/// Mark a job as completed with result
pub async fn complete_job(
db: &LocalDb,
id: Uuid,
status: JobStatus,
result: serde_json::Value,
started_at: DateTime<Utc>,
) -> Result<()> {
let now = Utc::now();
let duration_ms = (now - started_at).num_milliseconds();
let result_json = serde_json::to_string(&result)?;
db.execute(
r#"
INSERT INTO v2_job_completed (id, workspace_id, status, result, started_at, completed_at, duration_ms)
SELECT ?1, workspace_id, ?2, ?3, ?4, ?5, ?6
FROM v2_job WHERE id = ?1
"#,
libsql::params![
id.to_string(),
status.as_str(),
result_json,
started_at.to_rfc3339(),
now.to_rfc3339(),
duration_ms,
],
)
.await?;
// Remove from queue
db.execute(
"DELETE FROM v2_job_queue WHERE id = ?1",
libsql::params![id.to_string()],
)
.await?;
tracing::info!("Completed job {}: {:?}", id, status);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_push_preview() {
let db = LocalDb::in_memory().await.unwrap();
let req = PreviewRequest {
content: "export function main() { return 42; }".to_string(),
language: ScriptLang::Deno,
args: serde_json::json!({}),
lock: None,
tag: None,
};
let id = push_preview(&db, req).await.unwrap();
// Verify job exists in queue
let mut rows = db
.query(
"SELECT running FROM v2_job_queue WHERE id = ?1",
libsql::params![id.to_string()],
)
.await
.unwrap();
let row = rows.next().await.unwrap().unwrap();
let running: i64 = row.get(0).unwrap();
assert_eq!(running, 0);
}
#[tokio::test]
async fn test_complete_job() {
let db = LocalDb::in_memory().await.unwrap();
let req = PreviewRequest {
content: "export function main() { return 42; }".to_string(),
language: ScriptLang::Deno,
args: serde_json::json!({}),
lock: None,
tag: None,
};
let id = push_preview(&db, req).await.unwrap();
let started_at = Utc::now();
complete_job(
&db,
id,
JobStatus::Success,
serde_json::json!(42),
started_at,
)
.await
.unwrap();
// Verify job is in completed
let completed = get_completed_job(&db, id).await.unwrap().unwrap();
assert_eq!(completed.status, JobStatus::Success);
assert_eq!(completed.result, serde_json::json!(42));
}
}

View File

@@ -0,0 +1,31 @@
//! Windmill Local Mode
//!
//! This crate provides a minimal local mode for Windmill using libSQL (SQLite/Turso)
//! instead of PostgreSQL. The goal is to support preview execution end-to-end
//! with a lightweight, embedded database.
//!
//! ## Scope
//! - Script preview execution
//! - Flow preview execution
//! - In-memory or file-based SQLite storage
//! - Remote Turso database support for multi-writer scenarios
//!
//! ## Non-goals (for this experiment)
//! - Full feature parity with PostgreSQL mode
//! - Multi-worker support (single embedded worker)
//! - Persistence of scripts/flows (only jobs)
pub mod db;
pub mod schema;
pub mod jobs;
pub mod queue;
pub mod executor;
pub mod flow_executor;
pub mod worker;
pub mod server;
pub mod error;
pub use db::LocalDb;
pub use error::LocalError;
pub use worker::Worker;
pub use server::LocalServer;

View File

@@ -0,0 +1,296 @@
//! Queue operations for local mode
//!
//! Since local mode uses a single worker, we don't need the complex
//! `FOR UPDATE SKIP LOCKED` mechanism. Instead, we use simple atomic
//! operations with the database lock.
use chrono::{DateTime, Utc};
use uuid::Uuid;
use crate::db::LocalDb;
use crate::error::{LocalError, Result};
use crate::jobs::{JobKind, QueuedJob, ScriptLang};
/// Pull the next job from the queue
///
/// This is simplified from the PostgreSQL version since we have a single worker
/// and use the connection mutex for coordination.
pub async fn pull_job(db: &LocalDb) -> Result<Option<QueuedJob>> {
let now = Utc::now().to_rfc3339();
// Get the next job (ordered by priority, then scheduled_for)
// We use a transaction-like approach: SELECT then UPDATE
let mut rows = db
.query(
r#"
SELECT q.id, j.workspace_id, j.kind, j.script_lang, j.raw_code, j.raw_lock,
j.raw_flow, j.args, q.tag, j.created_at, q.scheduled_for,
j.parent_job, j.root_job, j.flow_step_id
FROM v2_job_queue q
JOIN v2_job j ON q.id = j.id
WHERE q.running = 0 AND q.scheduled_for <= ?1
ORDER BY q.priority DESC, q.scheduled_for ASC
LIMIT 1
"#,
libsql::params![now],
)
.await?;
let Some(row) = rows.next().await? else {
return Ok(None);
};
let id_str: String = row.get(0)?;
let id = Uuid::parse_str(&id_str).map_err(|e| LocalError::InvalidJobState(e.to_string()))?;
// Mark as running
let started_at = Utc::now().to_rfc3339();
db.execute(
"UPDATE v2_job_queue SET running = 1, started_at = ?2 WHERE id = ?1",
libsql::params![id_str.clone(), started_at],
)
.await?;
// Parse the job fields
let kind_str: String = row.get(2)?;
let kind = match kind_str.as_str() {
"preview" => JobKind::Preview,
"flowpreview" => JobKind::FlowPreview,
"script" => JobKind::Script,
"flow" => JobKind::Flow,
"flowscript" => JobKind::FlowScript,
"flownode" => JobKind::FlowNode,
_ => JobKind::Preview, // Default
};
let lang_str: Option<String> = row.get(3)?;
let script_lang = lang_str.and_then(|s| ScriptLang::from_str(&s));
let raw_code: Option<String> = row.get(4)?;
let raw_lock: Option<String> = row.get(5)?;
let raw_flow_str: Option<String> = row.get(6)?;
let raw_flow = raw_flow_str
.map(|s| serde_json::from_str(&s))
.transpose()?;
let args_str: Option<String> = row.get(7)?;
let args: serde_json::Value = args_str
.map(|s| serde_json::from_str(&s))
.transpose()?
.unwrap_or(serde_json::Value::Object(serde_json::Map::new()));
let tag: String = row.get(8)?;
let created_at_str: String = row.get(9)?;
let created_at = DateTime::parse_from_rfc3339(&created_at_str)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now());
let scheduled_for_str: String = row.get(10)?;
let scheduled_for = DateTime::parse_from_rfc3339(&scheduled_for_str)
.map(|dt| dt.with_timezone(&Utc))
.unwrap_or_else(|_| Utc::now());
let parent_job_str: Option<String> = row.get(11)?;
let parent_job = parent_job_str.and_then(|s| Uuid::parse_str(&s).ok());
let root_job_str: Option<String> = row.get(12)?;
let root_job = root_job_str.and_then(|s| Uuid::parse_str(&s).ok());
let flow_step_id: Option<String> = row.get(13)?;
Ok(Some(QueuedJob {
id,
workspace_id: row.get(1)?,
kind,
script_lang,
raw_code,
raw_lock,
raw_flow,
args,
tag,
created_at,
scheduled_for,
running: true,
parent_job,
root_job,
flow_step_id,
}))
}
/// Get queue statistics
pub async fn queue_stats(db: &LocalDb) -> Result<QueueStats> {
let mut rows = db
.query(
r#"
SELECT
COUNT(*) as total,
SUM(CASE WHEN running = 1 THEN 1 ELSE 0 END) as running,
SUM(CASE WHEN running = 0 THEN 1 ELSE 0 END) as pending
FROM v2_job_queue
"#,
(),
)
.await?;
let row = rows.next().await?.ok_or(LocalError::QueueEmpty)?;
Ok(QueueStats {
total: row.get::<i64>(0)? as u64,
running: row.get::<i64>(1).unwrap_or(0) as u64,
pending: row.get::<i64>(2).unwrap_or(0) as u64,
})
}
#[derive(Debug, Clone)]
pub struct QueueStats {
pub total: u64,
pub running: u64,
pub pending: u64,
}
/// Update job heartbeat (ping)
pub async fn ping_job(db: &LocalDb, id: Uuid) -> Result<()> {
let now = Utc::now().to_rfc3339();
db.execute(
"UPDATE v2_job_runtime SET ping = ?2 WHERE id = ?1",
libsql::params![id.to_string(), now],
)
.await?;
Ok(())
}
/// Update flow status for a running flow job
pub async fn update_flow_status(
db: &LocalDb,
id: Uuid,
flow_status: &serde_json::Value,
) -> Result<()> {
let status_json = serde_json::to_string(flow_status)?;
db.execute(
"UPDATE v2_job_status SET flow_status = ?2 WHERE id = ?1",
libsql::params![id.to_string(), status_json],
)
.await?;
Ok(())
}
/// Push a child job for flow execution
pub async fn push_flow_child_job(
db: &LocalDb,
parent_id: Uuid,
root_id: Uuid,
step_id: &str,
kind: JobKind,
script_lang: Option<ScriptLang>,
raw_code: Option<&str>,
args: &serde_json::Value,
) -> Result<Uuid> {
let id = Uuid::new_v4();
let now = Utc::now().to_rfc3339();
let args_json = serde_json::to_string(args)?;
// Insert into v2_job
db.execute(
r#"
INSERT INTO v2_job (id, kind, script_lang, raw_code, args, parent_job, root_job, flow_step_id, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)
"#,
libsql::params![
id.to_string(),
kind.as_str(),
script_lang.map(|l| l.as_str()),
raw_code,
args_json,
parent_id.to_string(),
root_id.to_string(),
step_id,
now.clone(),
],
)
.await?;
// Insert into v2_job_queue
db.execute(
r#"
INSERT INTO v2_job_queue (id, tag, created_at, scheduled_for, running)
VALUES (?1, 'flow', ?2, ?3, 0)
"#,
libsql::params![id.to_string(), now.clone(), now],
)
.await?;
// Insert into v2_job_runtime
db.execute(
"INSERT INTO v2_job_runtime (id) VALUES (?1)",
libsql::params![id.to_string()],
)
.await?;
Ok(id)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::jobs::{push_preview, PreviewRequest};
#[tokio::test]
async fn test_pull_job() {
let db = LocalDb::in_memory().await.unwrap();
// Push a job
let req = PreviewRequest {
content: "export function main() { return 42; }".to_string(),
language: ScriptLang::Deno,
args: serde_json::json!({}),
lock: None,
tag: None,
};
let pushed_id = push_preview(&db, req).await.unwrap();
// Pull it
let job = pull_job(&db).await.unwrap().unwrap();
assert_eq!(job.id, pushed_id);
assert!(job.running);
assert_eq!(job.kind, JobKind::Preview);
// Queue should now be empty (job is running)
let job2 = pull_job(&db).await.unwrap();
assert!(job2.is_none());
}
#[tokio::test]
async fn test_queue_stats() {
let db = LocalDb::in_memory().await.unwrap();
// Initially empty
let stats = queue_stats(&db).await.unwrap();
assert_eq!(stats.total, 0);
// Push two jobs
let req = PreviewRequest {
content: "test".to_string(),
language: ScriptLang::Deno,
args: serde_json::json!({}),
lock: None,
tag: None,
};
push_preview(&db, req.clone()).await.unwrap();
push_preview(&db, req).await.unwrap();
let stats = queue_stats(&db).await.unwrap();
assert_eq!(stats.total, 2);
assert_eq!(stats.pending, 2);
assert_eq!(stats.running, 0);
// Pull one
pull_job(&db).await.unwrap();
let stats = queue_stats(&db).await.unwrap();
assert_eq!(stats.total, 2);
assert_eq!(stats.pending, 1);
assert_eq!(stats.running, 1);
}
}

View File

@@ -0,0 +1,218 @@
//! SQLite schema for local mode
//!
//! This is a minimal schema supporting preview job execution.
//! Key differences from PostgreSQL:
//! - ENUMs are TEXT with CHECK constraints
//! - JSONB is JSON (stored as TEXT in SQLite)
//! - Arrays are JSON arrays
//! - No FOR UPDATE SKIP LOCKED (single worker, in-process coordination)
/// SQL to create the minimal schema for local mode preview execution
pub const SCHEMA: &str = r#"
-- Job kinds (equivalent to PostgreSQL ENUM)
-- Values: script, preview, flow, flowpreview, dependencies, flowdependencies,
-- script_hub, identity, http, graphql, postgresql, noop, appdependencies,
-- deploymentcallback, singlescriptflow, flowscript, flownode, appscript
-- Job status (equivalent to PostgreSQL ENUM)
-- Values: success, failure, canceled, skipped
-- Script languages (equivalent to PostgreSQL ENUM)
-- Values: python3, deno, go, bash, postgresql, nativets, bun, mysql, bigquery,
-- snowflake, graphql, powershell, mssql, php, bunnative, rust, ansible,
-- csharp, oracledb, nu, java, duckdb
-- Main job table (minimal for preview)
CREATE TABLE IF NOT EXISTS v2_job (
id TEXT PRIMARY KEY, -- UUID as TEXT
workspace_id TEXT NOT NULL DEFAULT 'local',
-- Raw code for preview jobs
raw_code TEXT,
raw_lock TEXT,
raw_flow TEXT, -- JSON for flow definitions
-- Job metadata
tag TEXT DEFAULT 'deno',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
created_by TEXT NOT NULL DEFAULT 'local_user',
-- Permission context (simplified for local mode)
permissioned_as TEXT NOT NULL DEFAULT 'u/local_user',
permissioned_as_email TEXT DEFAULT 'local@windmill.local',
-- Job type info
kind TEXT NOT NULL DEFAULT 'preview' CHECK (kind IN (
'script', 'preview', 'flow', 'flowpreview', 'dependencies',
'flowdependencies', 'script_hub', 'identity', 'http', 'graphql',
'postgresql', 'noop', 'appdependencies', 'deploymentcallback',
'singlescriptflow', 'flowscript', 'flownode', 'appscript'
)),
-- Script execution details
script_lang TEXT CHECK (script_lang IN (
'python3', 'deno', 'go', 'bash', 'postgresql', 'nativets', 'bun',
'mysql', 'bigquery', 'snowflake', 'graphql', 'powershell', 'mssql',
'php', 'bunnative', 'rust', 'ansible', 'csharp', 'oracledb', 'nu',
'java', 'duckdb'
)),
-- Flow execution details
parent_job TEXT, -- UUID reference
root_job TEXT, -- UUID reference
flow_step INTEGER,
flow_step_id TEXT,
flow_innermost_root_job TEXT,
-- Execution settings
timeout INTEGER,
priority INTEGER DEFAULT 0,
same_worker INTEGER DEFAULT 0, -- BOOLEAN as INTEGER
visible_to_owner INTEGER DEFAULT 1,
-- Arguments (JSON)
args TEXT, -- JSON object
-- Pre-run error if validation failed
pre_run_error TEXT
);
-- Job queue table
CREATE TABLE IF NOT EXISTS v2_job_queue (
id TEXT PRIMARY KEY, -- UUID, references v2_job.id
workspace_id TEXT NOT NULL DEFAULT 'local',
-- Timestamps
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
started_at TEXT,
scheduled_for TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
-- Queue state
running INTEGER NOT NULL DEFAULT 0, -- BOOLEAN
canceled_by TEXT,
canceled_reason TEXT,
-- Suspend state (for approval flows)
suspend INTEGER DEFAULT 0,
suspend_until TEXT,
-- Execution settings
tag TEXT DEFAULT 'deno',
priority INTEGER DEFAULT 0,
worker TEXT,
FOREIGN KEY (id) REFERENCES v2_job(id)
);
-- Index for queue ordering (simulates queue_sort_v2)
CREATE INDEX IF NOT EXISTS idx_queue_sort ON v2_job_queue (
priority DESC, scheduled_for ASC, tag
) WHERE running = 0;
-- Job runtime tracking (heartbeat/ping)
CREATE TABLE IF NOT EXISTS v2_job_runtime (
id TEXT PRIMARY KEY, -- UUID, references v2_job.id
ping TEXT, -- Timestamp
memory_peak INTEGER,
FOREIGN KEY (id) REFERENCES v2_job(id)
);
-- Completed jobs with results
CREATE TABLE IF NOT EXISTS v2_job_completed (
id TEXT PRIMARY KEY, -- UUID, references v2_job.id
workspace_id TEXT NOT NULL DEFAULT 'local',
-- Timing
started_at TEXT,
completed_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
duration_ms INTEGER,
-- Result
result TEXT, -- JSON
result_columns TEXT, -- JSON array of column names
-- Status
status TEXT NOT NULL DEFAULT 'success' CHECK (status IN (
'success', 'failure', 'canceled', 'skipped'
)),
-- Cancellation details
canceled_by TEXT,
canceled_reason TEXT,
-- Flow status (for flow jobs)
flow_status TEXT, -- JSON
-- Execution details
memory_peak INTEGER,
worker TEXT,
deleted INTEGER DEFAULT 0, -- BOOLEAN
FOREIGN KEY (id) REFERENCES v2_job(id)
);
-- Index for completed job lookup by workspace and time
CREATE INDEX IF NOT EXISTS idx_completed_workspace_time ON v2_job_completed (
workspace_id, completed_at DESC
);
-- Flow status tracking (separate from completed to allow updates during execution)
CREATE TABLE IF NOT EXISTS v2_job_status (
id TEXT PRIMARY KEY, -- UUID, references v2_job.id
flow_status TEXT, -- JSON object tracking flow module execution
flow_leaf_jobs TEXT, -- JSON object
workflow_as_code_status TEXT, -- JSON object
FOREIGN KEY (id) REFERENCES v2_job(id)
);
-- Simplified job permissions (for local mode, mostly unused)
CREATE TABLE IF NOT EXISTS job_perms (
job_id TEXT PRIMARY KEY,
email TEXT DEFAULT 'local@windmill.local',
username TEXT DEFAULT 'local_user',
is_admin INTEGER DEFAULT 1, -- BOOLEAN
is_operator INTEGER DEFAULT 0, -- BOOLEAN
workspace_id TEXT DEFAULT 'local',
groups TEXT, -- JSON array
folders TEXT, -- JSON array of objects
FOREIGN KEY (job_id) REFERENCES v2_job(id)
);
-- Simple audit log (optional for local mode)
CREATE TABLE IF NOT EXISTS audit (
id INTEGER PRIMARY KEY AUTOINCREMENT,
workspace_id TEXT DEFAULT 'local',
timestamp TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
username TEXT DEFAULT 'local_user',
operation TEXT NOT NULL,
action_kind TEXT CHECK (action_kind IN ('create', 'update', 'delete', 'execute')),
resource TEXT,
parameters TEXT -- JSON
);
-- Job logs storage
CREATE TABLE IF NOT EXISTS job_logs (
job_id TEXT PRIMARY KEY,
workspace_id TEXT DEFAULT 'local',
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
logs TEXT,
log_offset INTEGER DEFAULT 0,
FOREIGN KEY (job_id) REFERENCES v2_job(id)
);
"#;
/// SQL to drop all tables (for testing/reset)
pub const DROP_SCHEMA: &str = r#"
DROP TABLE IF EXISTS job_logs;
DROP TABLE IF EXISTS audit;
DROP TABLE IF EXISTS job_perms;
DROP TABLE IF EXISTS v2_job_status;
DROP TABLE IF EXISTS v2_job_completed;
DROP TABLE IF EXISTS v2_job_runtime;
DROP TABLE IF EXISTS v2_job_queue;
DROP TABLE IF EXISTS v2_job;
"#;

View File

@@ -0,0 +1,419 @@
//! HTTP server for local mode
//!
//! Provides a minimal API compatible with Windmill's preview endpoints.
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use axum::{
extract::{Path, State},
http::StatusCode,
response::IntoResponse,
routing::{get, post},
Json, Router,
};
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use tower_http::cors::{Any, CorsLayer};
use tower_http::trace::TraceLayer;
use uuid::Uuid;
use crate::db::LocalDb;
use crate::error::Result;
use crate::jobs::{
get_completed_job, push_flow_preview, push_preview, FlowPreviewRequest,
JobStatus, PreviewRequest, ScriptLang,
};
use crate::worker::Worker;
/// Application state shared across handlers
pub struct AppState {
pub db: Arc<LocalDb>,
}
/// Local server that runs the API and embedded worker
pub struct LocalServer {
db: Arc<LocalDb>,
addr: SocketAddr,
}
impl LocalServer {
/// Create a new local server
pub async fn new(addr: SocketAddr) -> Result<Self> {
let db = Arc::new(LocalDb::in_memory().await?);
Ok(Self { db, addr })
}
/// Create a local server with a file-based database
pub async fn with_file(addr: SocketAddr, db_path: &str) -> Result<Self> {
let db = Arc::new(LocalDb::file(db_path).await?);
Ok(Self { db, addr })
}
/// Create a local server connected to a remote Turso database
pub async fn with_turso(addr: SocketAddr, url: &str, auth_token: &str) -> Result<Self> {
let db = Arc::new(LocalDb::turso_remote(url, auth_token).await?);
Ok(Self { db, addr })
}
/// Run the server
pub async fn run(self) -> Result<()> {
let (shutdown_tx, shutdown_rx) = watch::channel(false);
// Start the embedded worker
let worker_db = self.db.clone();
let worker_handle = tokio::spawn(async move {
let mut worker = Worker::new(worker_db, shutdown_rx);
if let Err(e) = worker.run().await {
tracing::error!("Worker error: {}", e);
}
});
// Build the router
let state = Arc::new(AppState { db: self.db });
let app = create_router(state);
// Run the server
tracing::info!("Local server listening on {}", self.addr);
let listener = tokio::net::TcpListener::bind(self.addr).await.unwrap();
// Handle graceful shutdown
let shutdown_signal = async move {
tokio::signal::ctrl_c()
.await
.expect("Failed to install CTRL+C signal handler");
tracing::info!("Shutdown signal received");
shutdown_tx.send(true).ok();
};
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal)
.await
.unwrap();
// Wait for worker to finish
worker_handle.await.ok();
Ok(())
}
}
/// Create the API router
fn create_router(state: Arc<AppState>) -> Router {
let cors = CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any);
Router::new()
// Health check
.route("/health", get(health_check))
// Preview endpoints (mimics windmill-api)
.route("/api/w/:workspace/jobs/run/preview", post(run_preview))
.route(
"/api/w/:workspace/jobs/run_wait_result/preview",
post(run_wait_result_preview),
)
.route(
"/api/w/:workspace/jobs/run/preview_flow",
post(run_preview_flow),
)
.route(
"/api/w/:workspace/jobs/run_wait_result/preview_flow",
post(run_wait_result_preview_flow),
)
// Get job result
.route(
"/api/w/:workspace/jobs_u/completed/get_result/:job_id",
get(get_job_result),
)
.layer(TraceLayer::new_for_http())
.layer(cors)
.with_state(state)
}
// === Request/Response Types ===
#[derive(Debug, Deserialize)]
struct PreviewPayload {
content: String,
language: String,
#[serde(default)]
args: serde_json::Value,
lock: Option<String>,
tag: Option<String>,
}
#[derive(Debug, Deserialize)]
struct FlowPreviewPayload {
value: serde_json::Value,
#[serde(default)]
args: serde_json::Value,
tag: Option<String>,
}
#[derive(Debug, Serialize)]
struct JobCreatedResponse {
job_id: String,
}
#[derive(Debug, Serialize)]
struct ErrorResponse {
error: String,
}
// === Handlers ===
async fn health_check() -> &'static str {
"OK"
}
/// Run a preview job (async - returns job ID)
async fn run_preview(
State(state): State<Arc<AppState>>,
Path(_workspace): Path<String>,
Json(payload): Json<PreviewPayload>,
) -> impl IntoResponse {
let lang = match ScriptLang::from_str(&payload.language) {
Some(l) => l,
None => {
return (
StatusCode::BAD_REQUEST,
Json(ErrorResponse {
error: format!("Unknown language: {}", payload.language),
}),
)
.into_response();
}
};
let req = PreviewRequest {
content: payload.content,
language: lang,
args: payload.args,
lock: payload.lock,
tag: payload.tag,
};
match push_preview(&state.db, req).await {
Ok(job_id) => (
StatusCode::CREATED,
Json(JobCreatedResponse {
job_id: job_id.to_string(),
}),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
.into_response(),
}
}
/// Run a preview job and wait for result
async fn run_wait_result_preview(
State(state): State<Arc<AppState>>,
Path(_workspace): Path<String>,
Json(payload): Json<PreviewPayload>,
) -> impl IntoResponse {
let lang = match ScriptLang::from_str(&payload.language) {
Some(l) => l,
None => {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({"error": format!("Unknown language: {}", payload.language)})),
)
.into_response();
}
};
let req = PreviewRequest {
content: payload.content,
language: lang,
args: payload.args,
lock: payload.lock,
tag: payload.tag,
};
let job_id = match push_preview(&state.db, req).await {
Ok(id) => id,
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response();
}
};
// Poll for result with timeout
wait_for_result(&state.db, job_id, Duration::from_secs(60)).await
}
/// Run a flow preview job (async - returns job ID)
async fn run_preview_flow(
State(state): State<Arc<AppState>>,
Path(_workspace): Path<String>,
Json(payload): Json<FlowPreviewPayload>,
) -> impl IntoResponse {
let req = FlowPreviewRequest {
value: payload.value,
args: payload.args,
tag: payload.tag,
};
match push_flow_preview(&state.db, req).await {
Ok(job_id) => (
StatusCode::CREATED,
Json(JobCreatedResponse {
job_id: job_id.to_string(),
}),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
.into_response(),
}
}
/// Run a flow preview job and wait for result
async fn run_wait_result_preview_flow(
State(state): State<Arc<AppState>>,
Path(_workspace): Path<String>,
Json(payload): Json<FlowPreviewPayload>,
) -> impl IntoResponse {
let req = FlowPreviewRequest {
value: payload.value,
args: payload.args,
tag: payload.tag,
};
let job_id = match push_flow_preview(&state.db, req).await {
Ok(id) => id,
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response();
}
};
// Poll for result with timeout
wait_for_result(&state.db, job_id, Duration::from_secs(120)).await
}
/// Get the result of a completed job
async fn get_job_result(
State(state): State<Arc<AppState>>,
Path((_workspace, job_id)): Path<(String, String)>,
) -> impl IntoResponse {
let job_id = match Uuid::parse_str(&job_id) {
Ok(id) => id,
Err(_) => {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({"error": "Invalid job ID"})),
)
.into_response();
}
};
match get_completed_job(&state.db, job_id).await {
Ok(Some(job)) => {
if job.status == JobStatus::Success {
(StatusCode::OK, Json(job.result)).into_response()
} else {
(StatusCode::INTERNAL_SERVER_ERROR, Json(job.result)).into_response()
}
}
Ok(None) => (
StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": "Job not found or not completed"})),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response(),
}
}
/// Poll for job completion with timeout
async fn wait_for_result(
db: &LocalDb,
job_id: Uuid,
timeout: Duration,
) -> axum::response::Response {
let start = std::time::Instant::now();
let fast_poll_duration = Duration::from_secs(2);
let fast_poll_interval = Duration::from_millis(50);
let slow_poll_interval = Duration::from_millis(200);
loop {
if start.elapsed() > timeout {
return (
StatusCode::REQUEST_TIMEOUT,
Json(serde_json::json!({"error": "Timeout waiting for job result"})),
)
.into_response();
}
match get_completed_job(db, job_id).await {
Ok(Some(job)) => {
if job.status == JobStatus::Success {
return (StatusCode::OK, Json(job.result)).into_response();
} else {
return (StatusCode::INTERNAL_SERVER_ERROR, Json(job.result)).into_response();
}
}
Ok(None) => {
// Job not completed yet, keep polling
let interval = if start.elapsed() < fast_poll_duration {
fast_poll_interval
} else {
slow_poll_interval
};
tokio::time::sleep(interval).await;
}
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({"error": e.to_string()})),
)
.into_response();
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use axum::body::Body;
use axum::http::Request;
use tower::ServiceExt;
#[tokio::test]
async fn test_health_check() {
let db = Arc::new(LocalDb::in_memory().await.unwrap());
let state = Arc::new(AppState { db });
let app = create_router(state);
let response = app
.oneshot(Request::builder().uri("/health").body(Body::empty()).unwrap())
.await
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
}
}

View File

@@ -0,0 +1,204 @@
//! Worker for local mode
//!
//! A single embedded worker that pulls jobs from the queue and executes them.
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use chrono::Utc;
use crate::db::LocalDb;
use crate::error::Result;
use crate::executor::{execute_script, ExecutionResult};
use crate::flow_executor;
use crate::jobs::{complete_job, JobKind, JobStatus, QueuedJob};
use crate::queue::pull_job;
use windmill_common::flows::FlowValue;
/// Worker that processes jobs from the queue
pub struct Worker {
db: Arc<LocalDb>,
/// Channel to signal shutdown
shutdown_rx: watch::Receiver<bool>,
}
impl Worker {
/// Create a new worker
pub fn new(db: Arc<LocalDb>, shutdown_rx: watch::Receiver<bool>) -> Self {
Self { db, shutdown_rx }
}
/// Run the worker loop
pub async fn run(&mut self) -> Result<()> {
tracing::info!("Worker started");
loop {
// Check for shutdown signal
if *self.shutdown_rx.borrow() {
tracing::info!("Worker received shutdown signal");
break;
}
// Try to pull a job
match pull_job(&self.db).await {
Ok(Some(job)) => {
tracing::info!("Processing job: {} (kind: {:?})", job.id, job.kind);
if let Err(e) = self.process_job(job).await {
tracing::error!("Error processing job: {}", e);
}
}
Ok(None) => {
// No jobs available, wait a bit before polling again
tokio::select! {
_ = tokio::time::sleep(Duration::from_millis(100)) => {}
_ = self.shutdown_rx.changed() => {}
}
}
Err(e) => {
tracing::error!("Error pulling job: {}", e);
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
}
tracing::info!("Worker stopped");
Ok(())
}
/// Process a single job
async fn process_job(&self, job: QueuedJob) -> Result<()> {
let started_at = Utc::now();
match job.kind {
JobKind::Preview => {
self.process_preview_job(job, started_at).await
}
JobKind::FlowPreview => {
self.process_flow_preview_job(job, started_at).await
}
_ => {
// Unsupported job kind
let error_result = serde_json::json!({
"error": format!("Unsupported job kind in local mode: {:?}", job.kind)
});
complete_job(&self.db, job.id, JobStatus::Failure, error_result, started_at).await
}
}
}
/// Process a script preview job
async fn process_preview_job(&self, job: QueuedJob, started_at: chrono::DateTime<Utc>) -> Result<()> {
let Some(code) = &job.raw_code else {
let error_result = serde_json::json!({"error": "No code provided for preview"});
return complete_job(&self.db, job.id, JobStatus::Failure, error_result, started_at).await;
};
let Some(lang) = job.script_lang else {
let error_result = serde_json::json!({"error": "No language specified for preview"});
return complete_job(&self.db, job.id, JobStatus::Failure, error_result, started_at).await;
};
// Execute the script
let exec_result = execute_script(lang, code, &job.args).await;
match exec_result {
Ok(ExecutionResult { success, result, logs }) => {
tracing::debug!("Job {} logs:\n{}", job.id, logs);
let status = if success { JobStatus::Success } else { JobStatus::Failure };
complete_job(&self.db, job.id, status, result, started_at).await
}
Err(e) => {
let error_result = serde_json::json!({"error": e.to_string()});
complete_job(&self.db, job.id, JobStatus::Failure, error_result, started_at).await
}
}
}
/// Process a flow preview job using the full flow executor
async fn process_flow_preview_job(&self, job: QueuedJob, started_at: chrono::DateTime<Utc>) -> Result<()> {
let Some(flow_json) = &job.raw_flow else {
let error_result = serde_json::json!({"error": "No flow definition provided"});
return complete_job(&self.db, job.id, JobStatus::Failure, error_result, started_at).await;
};
// Parse the flow value using windmill-common types
let flow_value: FlowValue = match serde_json::from_value(flow_json.clone()) {
Ok(fv) => fv,
Err(e) => {
let error_result = serde_json::json!({"error": format!("Failed to parse flow: {}", e)});
return complete_job(&self.db, job.id, JobStatus::Failure, error_result, started_at).await;
}
};
if flow_value.modules.is_empty() {
let error_result = serde_json::json!({"error": "Flow has no modules"});
return complete_job(&self.db, job.id, JobStatus::Failure, error_result, started_at).await;
}
tracing::info!("Executing flow with {} modules", flow_value.modules.len());
// Execute the flow using the full flow executor
match flow_executor::execute_flow(&self.db, &flow_value, job.args.clone()).await {
Ok((result, status)) => {
let is_failure = status.failure_module.is_some();
let final_result = serde_json::json!({
"result": result,
"flow_status": status
});
let job_status = if is_failure {
JobStatus::Failure
} else {
JobStatus::Success
};
complete_job(&self.db, job.id, job_status, final_result, started_at).await
}
Err(e) => {
let error_result = serde_json::json!({"error": e.to_string()});
complete_job(&self.db, job.id, JobStatus::Failure, error_result, started_at).await
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::jobs::{get_completed_job, push_preview, PreviewRequest, ScriptLang};
#[tokio::test]
async fn test_worker_processes_bash_preview() {
let db = Arc::new(LocalDb::in_memory().await.unwrap());
let (shutdown_tx, shutdown_rx) = watch::channel(false);
// Push a bash preview job
let req = PreviewRequest {
content: "echo 42".to_string(),
language: ScriptLang::Bash,
args: serde_json::json!({}),
lock: None,
tag: None,
};
let job_id = push_preview(&db, req).await.unwrap();
// Create and run worker for one iteration
let mut worker = Worker::new(db.clone(), shutdown_rx);
// Process one job then shutdown
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(500)).await;
shutdown_tx.send(true).unwrap();
});
worker.run().await.unwrap();
// Check the job completed
let completed = get_completed_job(&db, job_id).await.unwrap();
assert!(completed.is_some());
let completed = completed.unwrap();
assert_eq!(completed.status, JobStatus::Success);
// Output "42" is parsed as JSON number
assert_eq!(completed.result, serde_json::json!(42));
}
}