rust tests cleanup (#6573)

* test improvements

* improve tests

* improve tests
This commit is contained in:
Ruben Fiszel
2025-09-10 14:43:10 +00:00
committed by GitHub
parent b8a2371a44
commit cf70265ee9
8 changed files with 2640 additions and 2398 deletions

28
backend/run_until_fail.sh Executable file
View File

@@ -0,0 +1,28 @@
#!/bin/bash
# Run the command repeatedly until it fails (exits with non-zero code)
while true; do
DISABLE_EMBEDDING=true \
RUST_LOG=info \
DENO_PATH=$(which deno) \
BUN_PATH=$(which bun) \
GO_PATH=$(which go) \
UV_PATH=$(which uv) \
CARGO_PATH=$(which cargo) \
cargo test --features enterprise,deno_core,license,python,rust,scoped_cache \
-- --nocapture --test-threads=8 | tee /tmp/test.log
# Capture the exit code of the cargo test command (not tee)
# We need to use PIPESTATUS to get the exit code of cargo test, not tee
EXIT_CODE=${PIPESTATUS[0]}
# Check if the command failed (non-zero exit code)
if [ $EXIT_CODE -ne 0 ]; then
echo "Command failed with exit code: $EXIT_CODE"
echo "Test output saved to /tmp/test.log"
exit $EXIT_CODE
fi
echo "Test passed, running again..."
echo "----------------------------------------"
done

662
backend/tests/common/mod.rs Normal file
View File

@@ -0,0 +1,662 @@
#![allow(dead_code)]
use std::{future::Future, str::FromStr, sync::Arc};
use futures::Stream;
use serde::Serialize;
use serde_json::json;
use sqlx::{postgres::PgListener, Pool, Postgres};
use tokio::sync::RwLock;
use uuid::Uuid;
use windmill_api_client::types::NewScript;
#[cfg(feature = "python")]
use windmill_common::flow_status::FlowStatusModule;
use windmill_common::{jobs::{JobKind, JobPayload, RawCode}, jwt::JWT_SECRET, scripts::{ ScriptHash, ScriptLang}, worker::WORKER_CONFIG, KillpillSender};
use windmill_queue::PushIsolationLevel;
/// it's important this is unique between tests as there is one prometheus registry and
/// run_worker shouldn't register the same metric with the same worker name more than once.
///
/// this must fit in varchar(50)
fn next_worker_name() -> String {
use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
static ID: AtomicUsize = AtomicUsize::new(0);
// n.b.: when tests are run with RUST_TEST_THREADS or --test-threads set to 1, the name
// will be "main"... The id provides uniqueness & thread_name gives context.
let id = ID.fetch_add(1, SeqCst);
let thread = std::thread::current();
let thread_name = thread
.name()
.map(|s| {
s.len()
.checked_sub(39)
.and_then(|start| s.get(start..))
.unwrap_or(s)
})
.unwrap_or("no thread name");
format!("{id}/worker-{thread_name}")
}
pub struct ApiServer {
pub addr: std::net::SocketAddr,
#[allow(unused)]
tx: tokio::sync::broadcast::Sender<()>,
#[allow(unused)]
task: tokio::task::JoinHandle<anyhow::Result<()>>,
}
impl ApiServer {
pub async fn start(db: Pool<Postgres>) -> anyhow::Result<Self> {
let (tx, rx) = tokio::sync::broadcast::channel::<()>(1);
let sock = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.map_err(|e| anyhow::anyhow!("failed to bind TCP listener: {}", e))?;
let addr = sock
.local_addr()
.map_err(|e| anyhow::anyhow!("failed to get local address: {}", e))?;
drop(sock);
let (port_tx, _port_rx) = tokio::sync::oneshot::channel::<String>();
let name = next_worker_name();
tracing::info!("starting api server for name={name}");
let task = tokio::task::spawn(windmill_api::run_server(
db.clone(),
None,
None,
addr,
rx,
port_tx,
false,
false,
format!("http://localhost:{}", addr.port()),
Some(name.clone()),
));
tracing::info!("waiting for server port for name={name}");
_port_rx.await.map_err(|e| {
tracing::error!("failed to receive port for name={name}: {e}");
anyhow::anyhow!("failed to receive port for name={name}: {e}")
})?;
// clear the cache between tests
windmill_common::cache::clear();
Ok(Self { addr, tx, task })
}
#[allow(unused)]
pub async fn close(self) -> anyhow::Result<()> {
println!("closing api server");
let Self { tx, task, .. } = self;
drop(tx);
task.await.unwrap()
}
}
pub struct RunJob {
pub payload: JobPayload,
pub args: serde_json::Map<String, serde_json::Value>,
}
impl From<JobPayload> for RunJob {
fn from(payload: JobPayload) -> Self {
Self { payload, args: Default::default() }
}
}
impl RunJob {
pub fn arg<S: Into<String>>(mut self, k: S, v: serde_json::Value) -> Self {
self.args.insert(k.into(), v);
self
}
pub async fn push(self, db: &Pool<Postgres>) -> Uuid {
let RunJob { payload, args } = self;
let mut hm_args = std::collections::HashMap::new();
for (k, v) in args {
hm_args.insert(k, windmill_common::worker::to_raw_value(&v));
}
let tx = PushIsolationLevel::IsolatedRoot(db.clone());
let (uuid, tx) = windmill_queue::push(
&db,
tx,
"test-workspace",
payload,
windmill_queue::PushArgs::from(&hm_args),
/* user */ "test-user",
/* email */ "test@windmill.dev",
/* permissioned_as */ "u/test-user".to_string(),
/* token_prefix */ None,
/* scheduled_for_o */ None,
/* schedule_path */ None,
/* parent_job */ None,
/* root job */ None,
/* flow_innermost_root_job */ None,
/* job_id */ None,
/* is_flow_step */ false,
/* same_worker */ false,
None,
true,
None,
None,
None,
None,
None,
false,
)
.await
.expect("push has to succeed");
tx.commit().await.unwrap();
uuid
}
/// push the job, spawn a worker, wait until the job is in completed_job
pub async fn run_until_complete(self, db: &Pool<Postgres>, port: u16) -> CompletedJob {
let uuid = self.push(db).await;
let listener = listen_for_completed_jobs(db).await;
in_test_worker(db, listener.find(&uuid), port).await;
let r = completed_job(uuid, db).await;
r
}
/// push the job, spawn a worker, wait until the job is in completed_job
pub async fn run_until_complete_with<F: Future<Output = ()>>(
self,
db: &Pool<Postgres>,
port: u16,
test: impl Fn(Uuid) -> F,
) -> CompletedJob {
let uuid = self.push(db).await;
let listener = listen_for_completed_jobs(db).await;
test(uuid).await;
in_test_worker(db, listener.find(&uuid), port).await;
let r = completed_job(uuid, db).await;
r
}
}
pub async fn run_job_in_new_worker_until_complete(
db: &Pool<Postgres>,
job: JobPayload,
port: u16,
) -> CompletedJob {
RunJob::from(job).run_until_complete(db, port).await
}
/// Start a worker with a timeout and run a future, until the worker quits or we time out.
///
/// Cleans up the worker before resolving.
pub async fn in_test_worker<Fut: std::future::Future>(
db: &Pool<Postgres>,
inner: Fut,
port: u16,
) -> <Fut as std::future::Future>::Output {
set_jwt_secret().await;
let (quit, worker) = spawn_test_worker(db, port);
let worker = tokio::time::timeout(std::time::Duration::from_secs(60), worker);
tokio::pin!(worker);
let res = tokio::select! {
biased;
res = inner => res,
res = &mut worker => match
res.expect("worker timed out")
.expect("worker panicked") {
_ => panic!("worker quit early"),
},
};
/* ensure the worker quits before we return */
quit.send();
let _: () = worker
.await
.expect("worker timed out")
.expect("worker panicked");
res
}
pub fn spawn_test_worker(
db: &Pool<Postgres>,
port: u16,
) -> (KillpillSender, tokio::task::JoinHandle<()>) {
std::fs::DirBuilder::new()
.recursive(true)
.create(windmill_worker::GO_BIN_CACHE_DIR)
.expect("could not create initial worker dir");
let (tx, rx) = KillpillSender::new(1);
let db = db.to_owned();
let worker_instance: &str = "test worker instance";
let worker_name: String = next_worker_name();
let ip: &str = Default::default();
let tx2 = tx.clone();
let future = async move {
let base_internal_url = format!("http://localhost:{}", port);
{
let mut wc = WORKER_CONFIG.write().await;
(*wc).worker_tags = windmill_common::worker::DEFAULT_TAGS.clone();
(*wc).priority_tags_sorted = vec![windmill_common::worker::PriorityTags {
priority: 0,
tags: (*wc).worker_tags.clone(),
}];
windmill_common::worker::store_suspended_pull_query(&wc).await;
windmill_common::worker::store_pull_query(&wc).await;
}
windmill_worker::run_worker(
&db.into(),
worker_instance,
worker_name,
1,
1,
ip,
rx,
tx2,
&base_internal_url,
)
.await
};
(tx, tokio::task::spawn(future))
}
pub async fn listen_for_completed_jobs(db: &Pool<Postgres>) -> impl Stream<Item = Uuid> + Unpin {
listen_for_uuid_on(db, "completed").await
}
#[cfg(feature = "deno_core")]
pub async fn listen_for_queue(db: &Pool<Postgres>) -> impl Stream<Item = Uuid> + Unpin {
listen_for_uuid_on(db, "queued").await
}
pub async fn listen_for_uuid_on(
db: &Pool<Postgres>,
channel: &'static str,
) -> impl Stream<Item = Uuid> + Unpin {
let mut listener = PgListener::connect_with(db).await.unwrap();
listener.listen(channel).await.unwrap();
Box::pin(futures::stream::unfold(listener, |mut listener| async move {
let uuid = listener
.try_recv()
.await
.unwrap()
.expect("lost database connection")
.payload()
.parse::<Uuid>()
.expect("invalid uuid");
Some((uuid, listener))
}))
}
pub async fn completed_job(uuid: Uuid, db: &Pool<Postgres>) -> CompletedJob {
sqlx::query_as::<_, CompletedJob>(
"SELECT *, result->'wm_labels' as labels FROM v2_as_completed_job WHERE id = $1",
)
.bind(uuid)
.fetch_one(db)
.await
.unwrap()
}
#[axum::async_trait(?Send)]
pub trait StreamFind: futures::Stream + Unpin + Sized {
async fn find(self, item: &Self::Item) -> Option<Self::Item>
where
for<'l> &'l Self::Item: std::cmp::PartialEq,
{
use futures::{future::ready, StreamExt};
self.filter(|i| ready(i == item)).next().await
}
}
impl<T: futures::Stream + Unpin + Sized> StreamFind for T {}
#[cfg(feature = "python")]
pub fn get_module(cjob: &CompletedJob, id: &str) -> Option<FlowStatusModule> {
cjob.flow_status.clone().and_then(|fs| {
use windmill_common::flow_status::FlowStatus;
find_module_in_vec(
serde_json::from_value::<FlowStatus>(fs).unwrap().modules,
id,
)
})
}
#[cfg(feature = "python")]
fn find_module_in_vec(modules: Vec<FlowStatusModule>, id: &str) -> Option<FlowStatusModule> {
modules.into_iter().find(|s| s.id() == id)
}
pub async fn set_jwt_secret() -> () {
let secret = "mytestsecret".to_string();
let mut l = JWT_SECRET.write().await;
*l = secret;
}
#[derive(Debug, sqlx::FromRow, Serialize)]
pub struct CompletedJob {
pub workspace_id: String,
pub id: Uuid,
pub parent_job: Option<Uuid>,
pub created_by: String,
pub created_at: chrono::DateTime<chrono::Utc>,
pub started_at: chrono::DateTime<chrono::Utc>,
pub duration_ms: i64,
pub success: bool,
pub script_path: Option<String>,
pub args: Option<serde_json::Value>,
pub result: Option<serde_json::Value>,
pub logs: Option<String>,
pub deleted: bool,
pub raw_code: Option<String>,
pub canceled: bool,
pub canceled_by: Option<String>,
pub canceled_reason: Option<String>,
pub schedule_path: Option<String>,
pub permissioned_as: String,
pub flow_status: Option<serde_json::Value>,
pub raw_flow: Option<serde_json::Value>,
pub is_flow_step: bool,
pub is_skipped: bool,
pub email: String,
pub visible_to_owner: bool,
pub mem_peak: Option<i32>,
pub tag: String,
pub script_hash: Option<ScriptHash>,
pub language: Option<ScriptLang>,
pub job_kind: JobKind,
}
impl CompletedJob {
pub fn json_result(&self) -> Option<serde_json::Value> {
self.result.clone()
}
}
pub async fn initialize_tracing() {
use std::sync::Once;
static ONCE: Once = Once::new();
ONCE.call_once(|| {
let _ = windmill_common::tracing_init::initialize_tracing(
"test",
&windmill_common::utils::Mode::Standalone,
"test",
);
});
}
pub async fn test_for_versions<F: Future<Output = ()>>(
version_flags: impl Iterator<Item = Arc<RwLock<bool>>>,
test: impl Fn() -> F,
) {
for version_flag in version_flags {
*version_flag.write().await = true;
test().await;
}
}
use futures::StreamExt;
// #[cfg(feature = "python")]
pub async fn assert_lockfile(
db: &Pool<Postgres>,
script_content: String,
language: ScriptLang,
expected_lines: Vec<&str>,
) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
let client = windmill_api_client::create_client(
&format!("http://localhost:{port}"),
"SECRET_TOKEN".to_string(),
);
client
.create_script(
"test-workspace",
&NewScript {
language: windmill_api_client::types::ScriptLang::from_str(language.as_str()).unwrap(),
content: script_content,
path: "f/system/test_import".to_string(),
concurrent_limit: None,
concurrency_time_window_s: None,
cache_ttl: None,
dedicated_worker: None,
description: "".to_string(),
draft_only: None,
envs: vec![],
is_template: None,
kind: None,
parent_hash: None,
lock: None,
summary: "".to_string(),
tag: None,
schema: std::collections::HashMap::new(),
ws_error_handler_muted: Some(false),
priority: None,
delete_after_use: None,
timeout: None,
restart_unless_cancelled: None,
deployment_message: None,
concurrency_key: None,
visible_to_runner_only: None,
no_main_func: None,
codebase: None,
has_preprocessor: None,
on_behalf_of_email: None,
assets: vec![],
},
)
.await
.unwrap();
let mut completed = listen_for_completed_jobs(&db).await;
let db2 = db.clone();
in_test_worker(
&db,
async move {
completed.next().await; // deployed script
let script = sqlx::query!(
"SELECT hash FROM script WHERE path = $1",
"f/system/test_import".to_string()
)
.fetch_one(&db2)
.await
.unwrap();
let job = RunJob::from(JobPayload::Dependencies {
path: "f/system/test_import".to_string(),
hash: ScriptHash(script.hash),
dedicated_worker: None,
language,
})
.push(&db2)
.await;
completed.next().await; // completed job
let result = completed_job(job, &db2).await.json_result().unwrap();
assert_eq!(
result,
json!({
"lock": expected_lines.join("\n"),
"status": "Successful lock file generation"
})
);
},
port,
)
.await;
Ok(())
}
pub async fn run_deployed_relative_imports(
db: &Pool<Postgres>,
script_content: String,
language: ScriptLang,
) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
let client = windmill_api_client::create_client(
&format!("http://localhost:{port}"),
"SECRET_TOKEN".to_string(),
);
client
.create_script(
"test-workspace",
&NewScript {
language: windmill_api_client::types::ScriptLang::from_str(language.as_str()).unwrap(),
content: script_content,
path: "f/system/test_import".to_string(),
concurrent_limit: None,
concurrency_time_window_s: None,
cache_ttl: None,
dedicated_worker: None,
description: "".to_string(),
draft_only: None,
envs: vec![],
is_template: None,
kind: None,
parent_hash: None,
lock: None,
summary: "".to_string(),
tag: None,
schema: std::collections::HashMap::new(),
ws_error_handler_muted: Some(false),
priority: None,
delete_after_use: None,
timeout: None,
restart_unless_cancelled: None,
deployment_message: None,
concurrency_key: None,
visible_to_runner_only: None,
no_main_func: None,
codebase: None,
has_preprocessor: None,
on_behalf_of_email: None,
assets: vec![],
},
)
.await
.unwrap();
let mut completed = listen_for_completed_jobs(&db).await;
let db2 = db.clone();
in_test_worker(
&db,
async move {
completed.next().await; // deployed script
let script = sqlx::query!(
"SELECT hash FROM script WHERE path = $1",
"f/system/test_import".to_string()
)
.fetch_one(&db2)
.await
.unwrap();
let job = RunJob::from(JobPayload::ScriptHash {
path: "f/system/test_import".to_string(),
hash: ScriptHash(script.hash),
custom_concurrency_key: None,
concurrent_limit: None,
concurrency_time_window_s: None,
cache_ttl: None,
dedicated_worker: None,
language,
priority: None,
apply_preprocessor: false,
})
.push(&db2)
.await;
completed.next().await; // completed job
let result = completed_job(job, &db2).await.json_result().unwrap();
assert_eq!(
result,
serde_json::json!([
"f/system/same_folder_script",
"f/system/same_folder_script",
"f/system_relative/different_folder_script",
"f/system_relative/different_folder_script"
])
);
},
port,
)
.await;
Ok(())
}
pub async fn run_preview_relative_imports(
db: &Pool<Postgres>,
script_content: String,
language: ScriptLang,
) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
let mut completed = listen_for_completed_jobs(&db).await;
let db2 = db.clone();
in_test_worker(
&db,
async move {
let job = RunJob::from(JobPayload::Code(RawCode {
hash: None,
content: script_content,
path: Some("f/system/test_import".to_string()),
language,
lock: None,
custom_concurrency_key: None,
concurrent_limit: None,
concurrency_time_window_s: None,
cache_ttl: None,
dedicated_worker: None,
}))
.push(&db2)
.await;
completed.next().await; // completed job
let result = completed_job(job, &db2).await.json_result().unwrap();
assert_eq!(
result,
serde_json::json!([
"f/system/same_folder_script",
"f/system/same_folder_script",
"f/system_relative/different_folder_script",
"f/system_relative/different_folder_script"
])
);
},
port,
)
.await;
Ok(())
}

View File

@@ -0,0 +1,756 @@
mod common;
mod job_payload {
use serde_json::json;
use std::sync::Arc;
use tokio::sync::RwLock;
use sqlx::{Pool, Postgres};
use windmill_common::scripts::{ScriptHash, ScriptLang};
use windmill_common::jobs::JobPayload;
use windmill_common::flows::{FlowValue, FlowModule, FlowModuleValue};
use windmill_common::flow_status::RestartedFrom;
use windmill_common::worker::{
MIN_VERSION_IS_AT_LEAST_1_427, MIN_VERSION_IS_AT_LEAST_1_432, MIN_VERSION_IS_AT_LEAST_1_440,
};
use crate::common::*;
pub async fn initialize_tracing() {
use std::sync::Once;
static ONCE: Once = Once::new();
ONCE.call_once(|| {
let _ = windmill_common::tracing_init::initialize_tracing(
"test",
&windmill_common::utils::Mode::Standalone,
"test",
);
});
}
use lazy_static::lazy_static;
use windmill_common::cache;
use windmill_common::flows::FlowNodeId;
lazy_static! {
static ref VERSION_FLAGS: [Arc<RwLock<bool>>; 3] = [
MIN_VERSION_IS_AT_LEAST_1_427.clone(),
MIN_VERSION_IS_AT_LEAST_1_432.clone(),
MIN_VERSION_IS_AT_LEAST_1_440.clone(),
];
}
#[cfg(feature = "deno_core")]
#[sqlx::test(fixtures("base", "hello"))]
async fn test_script_hash_payload(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
let test = || async {
let result = RunJob::from(JobPayload::ScriptHash {
hash: ScriptHash(123412),
path: "f/system/hello".to_string(),
custom_concurrency_key: None,
concurrent_limit: None,
concurrency_time_window_s: None,
cache_ttl: None,
dedicated_worker: None,
language: ScriptLang::Deno,
priority: None,
apply_preprocessor: false,
})
.arg("world", json!("foo"))
.run_until_complete(&db, port)
.await
.json_result()
.unwrap();
assert_eq!(result, json!("Hello foo!"));
};
test_for_versions(VERSION_FLAGS.iter().cloned(), test).await;
Ok(())
}
#[sqlx::test(fixtures("base", "hello"))]
async fn test_script_hash_payload_with_preprocessor(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
let test = || async {
let db = &db;
let job = RunJob::from(JobPayload::ScriptHash {
hash: ScriptHash(123413),
path: "f/system/hello_with_preprocessor".to_string(),
custom_concurrency_key: None,
concurrent_limit: None,
concurrency_time_window_s: None,
cache_ttl: None,
dedicated_worker: None,
language: ScriptLang::Deno,
priority: None,
apply_preprocessor: true,
})
.run_until_complete_with(db, port, |id| async move {
let job = sqlx::query!("SELECT preprocessed FROM v2_job WHERE id = $1", id)
.fetch_one(db)
.await
.unwrap();
assert_eq!(job.preprocessed, Some(false));
})
.await;
let args = job.args.as_ref().unwrap();
assert_eq!(args.get("foo"), Some(&json!("bar")));
assert_eq!(args.get("bar"), Some(&json!("baz")));
assert_eq!(job.json_result().unwrap(), json!("Hello bar baz"));
let job = sqlx::query!("SELECT preprocessed FROM v2_job WHERE id = $1", job.id)
.fetch_one(db)
.await
.unwrap();
assert_eq!(job.preprocessed, Some(true));
};
test_for_versions(VERSION_FLAGS.iter().cloned(), test).await;
Ok(())
}
#[sqlx::test(fixtures("base", "hello"))]
async fn test_flow_script_payload(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
// Deploy the flow to produce the "lite" version.
let _ = RunJob::from(JobPayload::FlowDependencies {
path: "f/system/hello_with_nodes_flow".to_string(),
dedicated_worker: None,
version: 1443253234253454,
})
.run_until_complete(&db, port)
.await
.json_result()
.unwrap();
let flow_data = cache::flow::fetch_version_lite(&db, 1443253234253454)
.await
.unwrap();
let flow_value = flow_data.value();
let flow_scripts = {
async fn load(db: &Pool<Postgres>, modules: &[FlowModule]) -> Vec<FlowNodeId> {
let mut res = vec![];
for module in modules {
let value =
serde_json::from_str::<FlowModuleValue>(module.value.get()).unwrap();
match value {
FlowModuleValue::FlowScript { id, .. } => res.push(id),
FlowModuleValue::ForloopFlow { modules_node: Some(flow_node), .. } => {
let flow_data = cache::flow::fetch_flow(db, flow_node).await.unwrap();
res.extend(Box::pin(load(db, &flow_data.value().modules)).await);
}
_ => {}
}
}
res
}
load(&db, &flow_value.modules).await
};
assert_eq!(flow_scripts.len(), 2);
let test = || async {
let result = RunJob::from(JobPayload::FlowScript {
id: flow_scripts[0],
language: ScriptLang::Deno,
custom_concurrency_key: None,
concurrent_limit: None,
concurrency_time_window_s: None,
cache_ttl: None,
dedicated_worker: None,
path: "f/system/hello/test-0".into(),
})
.arg("world", json!("foo"))
.run_until_complete(&db, port)
.await
.json_result()
.unwrap();
assert_eq!(result, json!("Hello foo!"));
};
test_for_versions(VERSION_FLAGS.iter().cloned(), test).await;
let test = || async {
let result = RunJob::from(JobPayload::FlowScript {
id: flow_scripts[1],
language: ScriptLang::Deno,
custom_concurrency_key: None,
concurrent_limit: None,
concurrency_time_window_s: None,
cache_ttl: None,
dedicated_worker: None,
path: "f/system/hello/test-0".into(),
})
.arg("hello", json!("You know nothing Jean Neige"))
.run_until_complete(&db, port)
.await
.json_result()
.unwrap();
assert_eq!(
result,
json!("Did you just say \"You know nothing Jean Neige\"??!")
);
};
test_for_versions(VERSION_FLAGS.iter().cloned(), test).await;
Ok(())
}
#[cfg(feature = "deno_core")]
#[sqlx::test(fixtures("base", "hello"))]
async fn test_flow_node_payload(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
// Deploy the flow to produce the "lite" version.
let _ = RunJob::from(JobPayload::FlowDependencies {
path: "f/system/hello_with_nodes_flow".to_string(),
dedicated_worker: None,
version: 1443253234253454,
})
.run_until_complete(&db, port)
.await
.json_result()
.unwrap();
let flow_data = cache::flow::fetch_version_lite(&db, 1443253234253454)
.await
.unwrap();
let flow_value = flow_data.value();
let forloop_module =
serde_json::from_str::<FlowModuleValue>(flow_value.modules[0].value.get()).unwrap();
let FlowModuleValue::ForloopFlow { modules_node: Some(id), .. } = forloop_module else {
panic!("Expected a forloop module with a flow node");
};
let test = || async {
let result = RunJob::from(JobPayload::FlowNode {
id,
path: "f/system/hello_with_nodes_flow/forloop-0".into(),
})
.arg("iter", json!({ "value": "tests", "index": 0 }))
.run_until_complete(&db, port)
.await
.json_result()
.unwrap();
assert_eq!(result, json!("Did you just say \"Hello tests!\"??!"));
};
test_for_versions(VERSION_FLAGS.iter().cloned(), test).await;
Ok(())
}
async fn test_dependencies_payload(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
let result = RunJob::from(JobPayload::Dependencies {
path: "f/system/hello".to_string(),
hash: ScriptHash(123412),
language: ScriptLang::Deno,
dedicated_worker: None,
})
.run_until_complete(&db, port)
.await
.json_result()
.unwrap();
assert_eq!(
result.get("status").unwrap(),
&json!("Successful lock file generation")
);
Ok(())
}
#[sqlx::test(fixtures("base", "hello"))]
async fn test_dependencies_payload_min_1_427(db: Pool<Postgres>) -> anyhow::Result<()> {
*MIN_VERSION_IS_AT_LEAST_1_427.write().await = true;
test_dependencies_payload(db).await?;
Ok(())
}
#[sqlx::test(fixtures("base", "hello"))]
async fn test_dependencies_payload_min_1_432(db: Pool<Postgres>) -> anyhow::Result<()> {
*MIN_VERSION_IS_AT_LEAST_1_432.write().await = true;
test_dependencies_payload(db).await?;
Ok(())
}
#[sqlx::test(fixtures("base", "hello"))]
async fn test_dependencies_payload_min_1_440(db: Pool<Postgres>) -> anyhow::Result<()> {
*MIN_VERSION_IS_AT_LEAST_1_440.write().await = true;
test_dependencies_payload(db).await?;
Ok(())
}
// Just test that deploying a flow work as expected.
#[sqlx::test(fixtures("base", "hello"))]
async fn test_flow_dependencies_payload(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
let test = || async {
let result = RunJob::from(JobPayload::FlowDependencies {
path: "f/system/hello_with_nodes_flow".to_string(),
dedicated_worker: None,
version: 1443253234253454,
})
.run_until_complete(&db, port)
.await
.json_result()
.unwrap();
assert_eq!(
result.get("status").unwrap(),
&json!("Successful lock file generation")
);
};
test_for_versions(VERSION_FLAGS.iter().cloned(), test).await;
Ok(())
}
#[sqlx::test(fixtures("base", "hello"))]
async fn test_raw_flow_dependencies_payload(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
let test = || async {
let result = RunJob::from(JobPayload::RawFlowDependencies {
path: "none".to_string(),
flow_value: serde_json::from_value(json!({
"modules": [{
"id": "a",
"value": {
"type": "rawscript",
"content": r#"export function main(world: string) {
const greet = `Hello ${world}!`;
console.log(greet)
return greet
}"#,
"language": "deno",
"input_transforms": {
"world": { "type": "javascript", "expr": "flow_input.world" }
}
}
}],
"schema": {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"properties": { "world": { "type": "string" } },
"type": "object",
"order": [ "world" ]
}
}))
.unwrap(),
})
.arg("skip_flow_update", json!(true))
.run_until_complete(&db, port)
.await
.json_result()
.unwrap();
let result = RunJob::from(JobPayload::RawFlow {
value: serde_json::from_value::<FlowValue>(
result.get("updated_flow_value").unwrap().clone(),
)
.unwrap(),
path: None,
restarted_from: None,
})
.arg("world", json!("Jean Neige"))
.run_until_complete(&db, port)
.await
.json_result()
.unwrap();
assert_eq!(result, json!("Hello Jean Neige!"));
};
test_for_versions(VERSION_FLAGS.iter().cloned(), test).await;
Ok(())
}
#[sqlx::test(fixtures("base", "hello"))]
async fn test_raw_script_dependencies_payload(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
let test = || async {
let result = RunJob::from(JobPayload::RawScriptDependencies {
script_path: "none".into(),
content: r#"export function main(world: string) {
const greet = `Hello ${world}!`;
console.log(greet)
return greet
}"#
.into(),
language: ScriptLang::Deno,
})
.run_until_complete(&db, port)
.await
.json_result()
.unwrap();
assert_eq!(
result,
json!({ "lock": "", "status": "Successful lock file generation" })
);
};
test_for_versions(VERSION_FLAGS.iter().cloned(), test).await;
Ok(())
}
#[cfg(feature = "deno_core")]
#[sqlx::test(fixtures("base", "hello"))]
async fn test_flow_payload(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
let test = || async {
let result = RunJob::from(JobPayload::Flow {
path: "f/system/hello_with_nodes_flow".to_string(),
dedicated_worker: None,
apply_preprocessor: false,
version: 1443253234253454,
})
.run_until_complete(&db, port)
.await
.json_result()
.unwrap();
assert_eq!(
result,
json!([
"Did you just say \"Hello foo!\"??!",
"Did you just say \"Hello bar!\"??!",
"Did you just say \"Hello baz!\"??!",
])
);
};
// Test the not "lite" flow.
test_for_versions(VERSION_FLAGS.iter().cloned(), test).await;
// Deploy the flow to produce the "lite" version.
let _ = RunJob::from(JobPayload::FlowDependencies {
path: "f/system/hello_with_nodes_flow".to_string(),
dedicated_worker: None,
version: 1443253234253454,
})
.run_until_complete(&db, port)
.await
.json_result()
.unwrap();
// Test the "lite" flow.
test_for_versions(VERSION_FLAGS.iter().cloned(), test).await;
Ok(())
}
#[cfg(feature = "deno_core")]
#[sqlx::test(fixtures("base", "hello"))]
async fn test_flow_payload_with_preprocessor(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
let db = &db;
let test = || async {
use windmill_common::flow_status::{FlowStatus, FlowStatusModule};
let job = RunJob::from(JobPayload::Flow {
path: "f/system/hello_with_preprocessor".to_string(),
dedicated_worker: None,
apply_preprocessor: true,
version: 1443253234253456,
})
.run_until_complete_with(db, port, |id| async move {
let job = sqlx::query!("SELECT preprocessed FROM v2_job WHERE id = $1", id)
.fetch_one(db)
.await
.unwrap();
assert_eq!(job.preprocessed, Some(false));
})
.await;
let args = job.args.as_ref().unwrap();
let flow_status = job.flow_status.as_ref().unwrap();
assert_eq!(args.get("foo"), Some(&json!("bar")));
assert_eq!(args.get("bar"), Some(&json!("baz")));
assert_eq!(job.json_result().unwrap(), json!("Hello bar-baz"));
let job = sqlx::query!("SELECT preprocessed FROM v2_job WHERE id = $1", job.id)
.fetch_one(db)
.await
.unwrap();
assert_eq!(job.preprocessed, Some(true));
let flow_status = serde_json::from_value::<FlowStatus>(flow_status.clone()).unwrap();
let FlowStatusModule::Success { job, .. } = flow_status.preprocessor_module.unwrap()
else {
panic!("Expected a success preprocessor module");
};
let pp_id = job;
let job = sqlx::query!(
"SELECT preprocessed, script_entrypoint_override FROM v2_job WHERE id = $1",
pp_id
)
.fetch_one(db)
.await
.unwrap();
assert_eq!(job.preprocessed, Some(true));
assert_eq!(
job.script_entrypoint_override.as_deref(),
Some("preprocessor")
);
};
// Test the not "lite" flow.
test_for_versions(VERSION_FLAGS.iter().cloned(), test).await;
// Deploy the flow to produce the "lite" version.
let _ = RunJob::from(JobPayload::FlowDependencies {
path: "f/system/hello_with_preprocessor".to_string(),
dedicated_worker: None,
version: 1443253234253456,
})
.run_until_complete(db, port)
.await
.json_result()
.unwrap();
// Test the "lite" flow.
test_for_versions(VERSION_FLAGS.iter().cloned(), test).await;
Ok(())
}
#[cfg(feature = "deno_core")]
#[sqlx::test(fixtures("base", "hello"))]
async fn test_restarted_flow_payload(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
let test = || async {
let completed_job_id = RunJob::from(JobPayload::Flow {
path: "f/system/hello_with_nodes_flow".to_string(),
dedicated_worker: None,
apply_preprocessor: true,
version: 1443253234253454,
})
.run_until_complete(&db, port)
.await
.id;
let result = RunJob::from(JobPayload::RestartedFlow {
completed_job_id,
step_id: "a".into(),
branch_or_iteration_n: None,
})
.arg("iter", json!({ "value": "tests", "index": 0 }))
.run_until_complete(&db, port)
.await
.json_result()
.unwrap();
assert_eq!(
result,
json!([
"Did you just say \"Hello foo!\"??!",
"Did you just say \"Hello bar!\"??!",
"Did you just say \"Hello baz!\"??!",
])
);
};
// Test the not "lite" flow.
test_for_versions(VERSION_FLAGS.iter().cloned(), test).await;
// Deploy the flow to produce the "lite" version.
let _ = RunJob::from(JobPayload::FlowDependencies {
path: "f/system/hello_with_nodes_flow".to_string(),
dedicated_worker: None,
version: 1443253234253454,
})
.run_until_complete(&db, port)
.await
.json_result()
.unwrap();
// Test the "lite" flow.
test_for_versions(VERSION_FLAGS.iter().cloned(), test).await;
Ok(())
}
#[cfg(feature = "deno_core")]
#[sqlx::test(fixtures("base", "hello"))]
async fn test_raw_flow_payload(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
let test = || async {
let result = RunJob::from(JobPayload::RawFlow {
value: serde_json::from_value(json!({
"modules": [{
"id": "a",
"value": {
"type": "rawscript",
"content": r#"export function main(world: string) {
const greet = `Hello ${world}!`;
console.log(greet)
return greet
}"#,
"language": "deno",
"input_transforms": {
"world": { "type": "javascript", "expr": "flow_input.world" }
}
}
}],
"schema": {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"properties": { "world": { "type": "string" } },
"type": "object",
"order": [ "world" ]
}
}))
.unwrap(),
path: None,
restarted_from: None,
})
.arg("world", json!("Jean Neige"))
.run_until_complete(&db, port)
.await
.json_result()
.unwrap();
assert_eq!(result, json!("Hello Jean Neige!"));
};
test_for_versions(VERSION_FLAGS.iter().cloned(), test).await;
Ok(())
}
#[cfg(feature = "deno_core")]
#[sqlx::test(fixtures("base", "hello"))]
async fn test_raw_flow_payload_with_restarted_from(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
let db = &db;
let test = |restarted_from, arg, result| async move {
let job = RunJob::from(JobPayload::RawFlow {
value: serde_json::from_value(json!({
"modules": [{
"id": "a",
"value": {
"type": "rawscript",
"content": r#"export function main(world: string) {
return `Hello ${world}!`;
}"#,
"language": "deno",
"input_transforms": {
"world": { "type": "javascript", "expr": "flow_input.world" }
}
}
}, {
"id": "b",
"value": {
"type": "rawscript",
"content": r#"export function main(world: string, a: string) {
return `${a} ${world}!`;
}"#,
"language": "deno",
"input_transforms": {
"world": { "type": "javascript", "expr": "flow_input.world" },
"a": { "type": "javascript", "expr": "results.a" }
}
}
}, {
"id": "c",
"value": {
"type": "forloopflow",
"iterator": { "type": "javascript", "expr": "['a', 'b', 'c']" },
"modules": [{
"value": {
"input_transforms": {
"world": { "type": "javascript", "expr": "flow_input.world" },
"b": { "type": "javascript", "expr": "results.b" },
"x": { "type": "javascript", "expr": "flow_input.iter.value" }
},
"type": "rawscript",
"language": "deno",
"content": r#"export function main(world: string, b: string, x: string) {
return `${x}: ${b} ${world}!`;
}"#,
},
}],
}
}],
"schema": {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"properties": { "world": { "type": "string" } },
"type": "object",
"order": [ "world" ]
}
}))
.unwrap(),
path: None,
restarted_from,
})
.arg("world", arg)
.run_until_complete(db, port)
.await;
assert_eq!(job.json_result().unwrap(), result);
job.id
};
let flow_job_id = test(
None,
json!("foo"),
json!([
"a: Hello foo! foo! foo!",
"b: Hello foo! foo! foo!",
"c: Hello foo! foo! foo!"
]),
)
.await;
let flow_job_id = test(
Some(RestartedFrom { flow_job_id, step_id: "a".into(), branch_or_iteration_n: None }),
json!("foo"),
json!([
"a: Hello foo! foo! foo!",
"b: Hello foo! foo! foo!",
"c: Hello foo! foo! foo!"
]),
)
.await;
let flow_job_id = test(
Some(RestartedFrom { flow_job_id, step_id: "b".into(), branch_or_iteration_n: None }),
json!("bar"),
json!([
"a: Hello foo! bar! bar!",
"b: Hello foo! bar! bar!",
"c: Hello foo! bar! bar!"
]),
)
.await;
let _ = test(
Some(RestartedFrom {
flow_job_id,
step_id: "c".into(),
branch_or_iteration_n: Some(1),
}),
json!("yolo"),
json!([
"a: Hello foo! bar! bar!",
"b: Hello foo! bar! yolo!",
"c: Hello foo! bar! yolo!"
]),
)
.await;
Ok(())
}
}

View File

@@ -0,0 +1,394 @@
mod common;
use crate::common::*;
use sqlx::Pool;
use sqlx::postgres::Postgres;
use windmill_common::scripts::{ ScriptLang};
#[cfg(feature = "python")]
#[sqlx::test(fixtures("base", "lockfile_python"))]
async fn test_requirements_python(db: Pool<Postgres>) -> anyhow::Result<()> {
let content = r#"# py: ==3.11.11
# requirements:
# tiny==0.1.3
import bar
import baz # pin: foo
import baz # repin: fee
import bug # repin: free
def main():
pass
"#
.to_string();
assert_lockfile(
&db,
content,
ScriptLang::Python3,
vec!["# py: 3.11.11", "tiny==0.1.3"],
)
.await?;
Ok(())
}
#[cfg(feature = "python")]
#[sqlx::test(fixtures("base", "lockfile_python"))]
async fn test_extra_requirements_python(db: Pool<Postgres>) -> anyhow::Result<()> {
{
use windmill_common::scripts::ScriptLang;
let content = r#"# py: ==3.11.11
# extra_requirements:
# tiny
import f.system.extra_requirements
import tiny # pin: tiny==0.1.0
import tiny # pin: tiny==0.1.1
import tiny # repin: tiny==0.1.2
def main():
pass
"#
.to_string();
assert_lockfile(
&db,
content,
ScriptLang::Python3,
vec!["# py: 3.11.11", "bottle==0.13.2", "tiny==0.1.2"],
)
.await?;
}
Ok(())
}
#[cfg(feature = "python")]
#[sqlx::test(fixtures("base", "lockfile_python"))]
async fn test_extra_requirements_python2(db: Pool<Postgres>) -> anyhow::Result<()> {
let content = r#"# py: ==3.11.11
# extra_requirements:
# tiny==0.1.3
import simplejson # pin: simplejson==3.20.1
def main():
pass
"#
.to_string();
assert_lockfile(
&db,
content,
ScriptLang::Python3,
vec!["# py: 3.11.11", "simplejson==3.20.1", "tiny==0.1.3"],
)
.await?;
Ok(())
}
#[cfg(feature = "python")]
#[sqlx::test(fixtures("base", "lockfile_python"))]
async fn test_pins_python(db: Pool<Postgres>) -> anyhow::Result<()> {
let content = r#"# py: ==3.11.11
# extra_requirements:
# tiny==0.1.3
# bottle==0.13.2
import f.system.requirements
import f.system.pins
import tiny # repin: tiny==0.1.3
import simplejson
def main():
pass
"#
.to_string();
assert_lockfile(
&db,
content,
ScriptLang::Python3,
vec![
"# py: 3.11.11",
"bottle==0.13.2",
"microdot==2.2.0",
"simplejson==3.19.3",
"tiny==0.1.3",
],
)
.await?;
Ok(())
}
#[cfg(feature = "python")]
#[sqlx::test(fixtures("base", "multipython"))]
async fn test_multipython_python(db: Pool<Postgres>) -> anyhow::Result<()> {
let content = r#"# py: <=3.12.2, >=3.12.0
import f.multipython.script1
import f.multipython.aliases
"#
.to_string();
assert_lockfile(&db, content, ScriptLang::Python3, vec!["# py: 3.12.1\n"]).await?;
Ok(())
}
#[cfg(feature = "python")]
#[sqlx::test(fixtures("base", "multipython"))]
async fn test_inline_script_metadata_python(db: Pool<Postgres>) -> anyhow::Result<()> {
let content = r#"# py_select_latest
# /// script
# requires-python = ">3.11,<3.12.3,!=3.12.2"
# dependencies = [
# "tiny==0.1.3",
# ]
# ///
"#
.to_string();
assert_lockfile(
&db,
content,
ScriptLang::Python3,
vec!["# py: 3.12.1", "tiny==0.1.3"],
)
.await?;
Ok(())
}
#[cfg(feature = "python")]
use windmill_common::jobs::JobPayload;
#[cfg(feature = "python")]
use windmill_common::jobs::RawCode;
#[cfg(feature = "python")]
#[sqlx::test(fixtures("base"))]
async fn test_python_job(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
let content = r#"
def main():
return "hello world"
"#
.to_owned();
let job = JobPayload::Code(RawCode {
hash: None,
content,
path: None,
language: ScriptLang::Python3,
lock: None,
custom_concurrency_key: None,
concurrent_limit: None,
concurrency_time_window_s: None,
cache_ttl: None,
dedicated_worker: None,
});
let result = run_job_in_new_worker_until_complete(&db, job, port)
.await
.json_result()
.unwrap();
assert_eq!(result, serde_json::json!("hello world"));
Ok(())
}
#[cfg(feature = "python")]
#[sqlx::test(fixtures("base"))]
async fn test_python_global_site_packages(db: Pool<Postgres>) -> anyhow::Result<()> {
use windmill_common::{cache::concatcp, worker::ROOT_CACHE_DIR};
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
// Shared for all 3.12.*
let path = concatcp!(ROOT_CACHE_DIR, "python_3_12/global-site-packages").to_owned();
std::fs::create_dir_all(&path).unwrap();
std::fs::write(path + "/my_global_site_package_3_12_any.py", "").unwrap();
// 3.12
{
let content = r#"# py: ==3.12
#requirements:
#
import my_global_site_package_3_12_any
def main():
return "hello world"
"#
.to_owned();
let job = JobPayload::Code(RawCode {
hash: None,
content,
path: None,
language: ScriptLang::Python3,
lock: None,
custom_concurrency_key: None,
concurrent_limit: None,
concurrency_time_window_s: None,
cache_ttl: None,
dedicated_worker: None,
});
let result = run_job_in_new_worker_until_complete(&db, job, port)
.await
.json_result()
.unwrap();
assert_eq!(result, serde_json::json!("hello world"));
}
// 3.12.1
{
let content = r#"# py: ==3.12.1
#requirements:
#
import my_global_site_package_3_12_any
def main():
return "hello world"
"#
.to_owned();
let job = JobPayload::Code(RawCode {
hash: None,
content,
path: None,
language: ScriptLang::Python3,
lock: None,
custom_concurrency_key: None,
concurrent_limit: None,
concurrency_time_window_s: None,
cache_ttl: None,
dedicated_worker: None,
});
let result = run_job_in_new_worker_until_complete(&db, job, port)
.await
.json_result()
.unwrap();
assert_eq!(result, serde_json::json!("hello world"));
}
Ok(())
}
#[cfg(feature = "python")]
#[sqlx::test(fixtures("base"))]
async fn test_python_job_heavy_dep(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
let content = r#"
import numpy as np
def main():
a = np.arange(15).reshape(3, 5)
return len(a)
"#
.to_owned();
let job = JobPayload::Code(RawCode {
hash: None,
content,
path: None,
language: ScriptLang::Python3,
lock: None,
custom_concurrency_key: None,
concurrent_limit: None,
concurrency_time_window_s: None,
cache_ttl: None,
dedicated_worker: None,
});
let result = run_job_in_new_worker_until_complete(&db, job, port)
.await
.json_result()
.unwrap();
assert_eq!(result, serde_json::json!(3));
Ok(())
}
#[cfg(feature = "python")]
#[sqlx::test(fixtures("base"))]
async fn test_python_job_with_imports(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
let content = r#"
import wmill
def main():
return wmill.get_workspace()
"#
.to_owned();
let job = JobPayload::Code(RawCode {
hash: None,
content,
path: None,
language: ScriptLang::Python3,
lock: None,
custom_concurrency_key: None,
concurrent_limit: None,
concurrency_time_window_s: None,
cache_ttl: None,
dedicated_worker: None,
});
let result = run_job_in_new_worker_until_complete(&db, job, port)
.await
.json_result()
.unwrap();
assert_eq!(result, serde_json::json!("test-workspace"));
Ok(())
}
#[cfg(feature = "python")]
#[sqlx::test(fixtures("base", "relative_python"))]
async fn test_relative_imports_python(db: Pool<Postgres>) -> anyhow::Result<()> {
let content = r#"
from f.system.same_folder_script import main as test1
from .same_folder_script import main as test2
from f.system_relative.different_folder_script import main as test3
from ..system_relative.different_folder_script import main as test4
def main():
return [test1(), test2(), test3(), test4()]
"#
.to_string();
run_deployed_relative_imports(&db, content.clone(), ScriptLang::Python3).await?;
run_preview_relative_imports(&db, content, ScriptLang::Python3).await?;
Ok(())
}
#[cfg(feature = "python")]
#[sqlx::test(fixtures("base", "relative_python"))]
async fn test_nested_imports_python(db: Pool<Postgres>) -> anyhow::Result<()> {
let content = r#"
from f.system_relative.nested_script import main as test
def main():
return test()
"#
.to_string();
run_deployed_relative_imports(&db, content.clone(), ScriptLang::Python3).await?;
run_preview_relative_imports(&db, content, ScriptLang::Python3).await?;
Ok(())
}

343
backend/tests/retry.rs Normal file
View File

@@ -0,0 +1,343 @@
mod common;
#[cfg(feature = "deno_core")]
mod retry {
use serde_json::json;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use sqlx::{Pool, Postgres};
use windmill_common::flows::FlowValue;
use windmill_common::jobs::JobPayload;
use windmill_common::flow_status::FlowStatusModule;
use crate::common::*;
pub async fn initialize_tracing() {
use std::sync::Once;
static ONCE: Once = Once::new();
ONCE.call_once(|| {
let _ = windmill_common::tracing_init::initialize_tracing(
"test",
&windmill_common::utils::Mode::Standalone,
"test",
);
});
}
/// test helper provides some external state to help steps fail at specific points
struct Server {
addr: std::net::SocketAddr,
tx: tokio::sync::oneshot::Sender<()>,
task: tokio::task::JoinHandle<Vec<u8>>,
}
impl Server {
async fn start(responses: Vec<Option<u8>>) -> Self {
use tokio::net::TcpListener;
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
let sock = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = sock.local_addr().unwrap();
let task = tokio::task::spawn(async move {
tokio::pin!(rx);
let mut results = vec![];
for next in responses {
let (mut peer, _) = tokio::select! {
_ = &mut rx => break,
r = sock.accept() => r,
}
.unwrap();
let n = peer.read_u8().await.unwrap();
results.push(n);
if let Some(next) = next {
peer.write_u8(next).await.unwrap();
}
}
results
});
return Self { addr, tx, task };
}
async fn close(self) -> Vec<u8> {
let Self { task, tx, .. } = self;
drop(tx);
task.await.unwrap()
}
}
fn inner_step() -> &'static str {
r#"
export async function main(index, port) {
const buf = new Uint8Array([0]);
const sock = await Deno.connect({ port });
await sock.write(new Uint8Array([index]));
if (await sock.read(buf) != 1) throw Error("read");
return buf[0];
}
"#
}
fn last_step() -> &'static str {
r#"
def main(last, port):
with __import__("socket").create_connection((None, port)) as sock:
sock.send(b'\xff')
return last + [sock.recv(1)[0]]
"#
}
#[cfg(feature = "deno_core")]
fn flow_forloop_retry() -> FlowValue {
serde_json::from_value(serde_json::json!({
"modules": [{
"id": "a",
"value": {
"type": "forloopflow",
"iterator": { "type": "javascript", "expr": "flow_input.items" },
"skip_failures": false,
"modules": [{
"value": {
"input_transforms": {
"index": { "type": "javascript", "expr": "flow_input.iter.index" },
"port": { "type": "javascript", "expr": "flow_input.port" },
},
"type": "rawscript",
"language": "deno",
"content": inner_step(),
},
}],
},
"retry": { "constant": { "attempts": 2, "seconds": 0 } },
}, {
"value": {
"input_transforms": {
"last": { "type": "javascript", "expr": "results.a" },
"port": { "type": "javascript", "expr": "flow_input.port" },
},
"type": "rawscript",
"language": "python3",
"content": last_step(),
},
"retry": { "constant": { "attempts": 2, "seconds": 0 } },
}],
}))
.unwrap()
}
#[cfg(feature = "deno_core")]
#[sqlx::test(fixtures("base"))]
async fn test_pass(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
// let server = ApiServer::start(db.clone()).await?;
/* fails twice in the loop, then once on the last step
* retry attempts is measured per-step, so it _retries_ at most two times on each step,
* which means it may run the step three times in total */
let (attempts, responses) = [
/* pass fail */
(0, Some(99)),
(1, None),
/* pass pass fail */
(0, Some(99)),
(1, Some(99)),
(2, None),
/* pass pass pass */
(0, Some(3)),
(1, Some(5)),
(2, Some(7)),
/* fail the last step once */
(0xff, None),
(0xff, Some(9)),
]
.into_iter()
.unzip::<_, _, Vec<_>, Vec<_>>();
let server = Server::start(responses).await;
let result = RunJob::from(JobPayload::RawFlow {
value: flow_forloop_retry(),
path: None,
restarted_from: None,
})
.arg("items", json!(["unused", "unused", "unused"]))
.arg("port", json!(server.addr.port()))
.run_until_complete(&db, server.addr.port())
.await
.json_result()
.unwrap();
assert_eq!(server.close().await, attempts);
assert_eq!(json!([3, 5, 7, 9]), result);
Ok(())
}
#[cfg(feature = "deno_core")]
#[sqlx::test(fixtures("base"))]
async fn test_fail_step_zero(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
/* attempt and fail the first step three times and stop */
let (attempts, responses) = [
/* pass fail x3 */
(0, Some(99)),
(1, None),
(0, Some(99)),
(1, None),
(0, Some(99)),
(1, None),
]
.into_iter()
.unzip::<_, _, Vec<_>, Vec<_>>();
let server = Server::start(responses).await;
let result = RunJob::from(JobPayload::RawFlow {
value: flow_forloop_retry(),
path: None,
restarted_from: None,
})
.arg("items", json!(["unused", "unused", "unused"]))
.arg("port", json!(server.addr.port()))
.run_until_complete(&db, server.addr.port())
.await
.json_result()
.unwrap();
assert_eq!(server.close().await, attempts);
assert!(
result[1]["error"]
.as_object()
.unwrap()
.get("message")
.unwrap()
.as_str()
.unwrap()
== "read"
);
Ok(())
}
#[cfg(feature = "deno_core")]
#[sqlx::test(fixtures("base"))]
async fn test_fail_step_one(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
/* attempt and fail the first step three times and stop */
let (attempts, responses) = [
/* fail once, then pass */
(0, None),
(0, Some(1)),
(1, Some(2)),
(2, Some(3)),
/* fail three times */
(0xff, None),
(0xff, None),
(0xff, None),
]
.into_iter()
.unzip::<_, _, Vec<_>, Vec<_>>();
let server = Server::start(responses).await;
let job = RunJob::from(JobPayload::RawFlow {
value: flow_forloop_retry(),
path: None,
restarted_from: None,
})
.arg("items", json!(["unused", "unused", "unused"]))
.arg("port", json!(server.addr.port()))
.run_until_complete(&db, server.addr.port())
.await;
let result = job.json_result().unwrap();
assert_eq!(server.close().await, attempts);
assert!(result["error"]
.as_object()
.unwrap()
.get("message")
.unwrap()
.as_str()
.unwrap()
.contains("index out of range"));
Ok(())
}
#[cfg(feature = "python")]
#[sqlx::test(fixtures("base"))]
async fn test_with_failure_module(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
// let server = ApiServer::start(db.clone()).await?;
let value = serde_json::from_value(json!({
"modules": [{
"id": "a",
"value": {
"input_transforms": { "port": { "type": "javascript", "expr": "flow_input.port" } },
"type": "rawscript",
"language": "python3",
"content": r#"
def main(port):
with __import__("socket").create_connection((None, port)) as sock:
sock.send(b'\x00')
return sock.recv(1)[0]"#,
},
"retry": { "constant": { "attempts": 1, "seconds": 0 } },
}],
"failure_module": {
"value": {
"input_transforms": { "error": { "type": "javascript", "expr": "previous_result", },
"port": { "type": "javascript", "expr": "flow_input.port" } },
"type": "rawscript",
"language": "python3",
"content": r#"
def main(error, port):
with __import__("socket").create_connection((None, port)) as sock:
sock.send(b'\xff')
return { "recv": sock.recv(1)[0], "from failure module": error }"#,
},
"retry": { "constant": { "attempts": 1, "seconds": 0 } },
},
}))
.unwrap();
let (_attempts, responses) = [
/* fail the first step twice */
(0x00, None),
(0x00, None),
/* and the failure module once */
(0xff, None),
(0xff, Some(42)),
]
.into_iter()
.unzip::<_, _, Vec<_>, Vec<_>>();
let server = Server::start(responses).await;
let cjob = RunJob::from(JobPayload::RawFlow { value, path: None, restarted_from: None })
.arg("port", json!(server.addr.port()))
.run_until_complete(&db, server.addr.port())
.await;
let result = cjob.json_result().clone().unwrap();
let failed_module = get_module(&cjob, "a").unwrap();
match failed_module {
FlowStatusModule::Failure { .. } => {}
_ => panic!("expected failure module"),
}
println!("result: {:#?}", result);
assert_eq!(
result
.get("from failure module")
.unwrap()
.get("error")
.unwrap()
.get("name")
.unwrap()
.clone(),
json!("IndexError")
);
assert_eq!(result.get("recv").unwrap().clone(), json!(42));
Ok(())
}
}

View File

@@ -0,0 +1,302 @@
mod common;
mod suspend_resume {
#[cfg(feature = "deno_core")]
use serde_json::json;
#[cfg(feature = "deno_core")]
use crate::common::*;
#[cfg(feature = "deno_core")]
use sqlx::{Pool, Postgres};
#[cfg(feature = "deno_core")]
use sqlx::types::Uuid;
#[cfg(feature = "deno_core")]
use futures::{Stream, StreamExt};
#[cfg(feature = "deno_core")]
use windmill_common::flows::FlowValue;
#[cfg(feature = "deno_core")]
use windmill_common::jobs::JobPayload;
#[cfg(feature = "deno_core")]
pub async fn initialize_tracing() {
use std::sync::Once;
static ONCE: Once = Once::new();
ONCE.call_once(|| {
let _ = windmill_common::tracing_init::initialize_tracing(
"test",
&windmill_common::utils::Mode::Standalone,
"test",
);
});
}
#[cfg(feature = "deno_core")]
async fn wait_until_flow_suspends(
flow: Uuid,
mut queue: impl Stream<Item = Uuid> + Unpin,
db: &Pool<Postgres>,
) {
loop {
queue.by_ref().find(&flow).await.unwrap();
if sqlx::query_scalar!(
"SELECT suspend > 0 AS \"r!\" FROM v2_job_queue WHERE id = $1",
flow
)
.fetch_one(db)
.await
.unwrap()
{
break;
}
}
}
#[cfg(feature = "deno_core")]
fn flow() -> FlowValue {
serde_json::from_value(serde_json::json!({
"modules": [{
"id": "a",
"value": {
"input_transforms": {
"n": { "type": "javascript", "expr": "flow_input.n", },
"port": { "type": "javascript", "expr": "flow_input.port", },
"op": { "type": "javascript", "expr": "flow_input.op ?? 'resume'", },
},
"type": "rawscript",
"language": "deno",
"content": "\
export async function main(n, port, op) {\
const job = Deno.env.get('WM_JOB_ID');
const token = Deno.env.get('WM_TOKEN');
const r = await fetch(
`http://localhost:${port}/api/w/test-workspace/jobs/job_signature/${job}/0?token=${token}&approver=ruben`,\
{\
method: 'GET',\
headers: { 'Authorization': `Bearer ${token}` }\
}\
);\
console.log(r);\
const secret = await r.text();\
console.log('Secret: ' + secret + ' ' + job + ' ' + token);\
const r2 = await fetch(
`http://localhost:${port}/api/w/test-workspace/jobs_u/${op}/${job}/0/${secret}?approver=ruben`,\
{\
method: 'POST',\
body: JSON.stringify('from job'),\
headers: { 'content-type': 'application/json' }\
}\
);\
console.log(await r2.text());\
return n + 1;\
}",
},
"suspend": {
"required_events": 1
},
}, {
"id": "b",
"value": {
"input_transforms": {
"n": { "type": "javascript", "expr": "results.a", },
"resume": { "type": "javascript", "expr": "resume", },
"resumes": { "type": "javascript", "expr": "resumes", },
},
"type": "rawscript",
"language": "deno",
"content": "export function main(n, resume, resumes) { return { n: n + 1, resume, resumes } }"
},
"suspend": {
"required_events": 1
},
}, {
"value": {
"input_transforms": {
"last": { "type": "javascript", "expr": "results.b", },
"resume": { "type": "javascript", "expr": "resume", },
"resumes": { "type": "javascript", "expr": "resumes", },
},
"type": "rawscript",
"language": "deno",
"content": "export function main(last, resume, resumes) { return { last, resume, resumes } }"
},
}],
}))
.unwrap()
}
#[cfg(feature = "deno_core")]
#[sqlx::test(fixtures("base"))]
async fn test(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
let flow =
RunJob::from(JobPayload::RawFlow { value: flow(), path: None, restarted_from: None })
.arg("n", json!(1))
.arg("port", json!(port))
.push(&db)
.await;
let mut completed = listen_for_completed_jobs(&db).await;
let queue = listen_for_queue(&db).await;
let db_ = db.clone();
in_test_worker(&db, async move {
let db = db_;
wait_until_flow_suspends(flow, queue, &db).await;
// print_job(flow, &db).await;
/* The first job resumes itself. */
let _first = completed.next().await.unwrap();
// print_job(_first, &db).await;
/* ... and send a request resume it. */
let second = completed.next().await.unwrap();
// print_job(second, &db).await;
let token = windmill_common::auth::create_token_for_owner(&db, "test-workspace", "u/test-user", "", 100, "", &Uuid::nil(), None, None).await.unwrap();
let secret = reqwest::get(format!(
"http://localhost:{port}/api/w/test-workspace/jobs/job_signature/{second}/0?token={token}&approver=ruben"
))
.await
.unwrap()
.error_for_status()
.unwrap()
.text().await.unwrap();
println!("{}", secret);
/* ImZyb20gdGVzdCIK = base64 "from test" */
reqwest::get(format!(
"http://localhost:{port}/api/w/test-workspace/jobs_u/resume/{second}/0/{secret}?payload=ImZyb20gdGVzdCIK&approver=ruben"
))
.await
.unwrap()
.error_for_status()
.unwrap();
completed.find(&flow).await.unwrap();
}, port)
.await;
server.close().await.unwrap();
let result = completed_job(flow, &db).await.json_result().unwrap();
assert_eq!(
json!({
"last": {
"resume": "from job",
"resumes": ["from job"],
"n": 3,
},
"resume": "from test",
"resumes": ["from test"],
}),
result
);
// ensure resumes are cleaned up through CASCADE when the flow is finished
assert_eq!(
0,
sqlx::query_scalar!("SELECT count(*) AS \"count!\" FROM resume_job")
.fetch_one(&db)
.await
.unwrap()
);
Ok(())
}
#[cfg(feature = "deno_core")]
#[sqlx::test(fixtures("base"))]
async fn cancel_from_job(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
let result =
RunJob::from(JobPayload::RawFlow { value: flow(), path: None, restarted_from: None })
.arg("n", json!(1))
.arg("op", json!("cancel"))
.arg("port", json!(port))
.run_until_complete(&db, port)
.await
.json_result()
.unwrap();
server.close().await.unwrap();
assert_eq!(
json!( {"error": {"name": "SuspendedDisapproved", "message": "Disapproved by ruben"}}),
result
);
Ok(())
}
#[cfg(feature = "deno_core")]
#[sqlx::test(fixtures("base"))]
async fn cancel_after_suspend(db: Pool<Postgres>) -> anyhow::Result<()> {
initialize_tracing().await;
let server = ApiServer::start(db.clone()).await?;
let port = server.addr.port();
let flow =
RunJob::from(JobPayload::RawFlow { value: flow(), path: None, restarted_from: None })
.arg("n", json!(1))
.arg("port", json!(port))
.push(&db)
.await;
let mut completed = listen_for_completed_jobs(&db).await;
let queue = listen_for_queue(&db).await;
let db_ = db.clone();
in_test_worker(&db, async move {
let db = db_;
wait_until_flow_suspends(flow, queue, &db).await;
/* The first job resumes itself. */
let _first = completed.next().await.unwrap();
/* ... and send a request resume it. */
let second = completed.next().await.unwrap();
let token = windmill_common::auth::create_token_for_owner(&db, "test-workspace", "u/test-user", "", 100, "", &Uuid::nil(), None, None).await.unwrap();
let secret = reqwest::get(format!(
"http://localhost:{port}/api/w/test-workspace/jobs/job_signature/{second}/0?token={token}"
))
.await
.unwrap()
.error_for_status()
.unwrap()
.text().await.unwrap();
println!("{}", secret);
/* ImZyb20gdGVzdCIK = base64 "from test" */
reqwest::get(format!(
"http://localhost:{port}/api/w/test-workspace/jobs_u/cancel/{second}/0/{secret}?payload=ImZyb20gdGVzdCIK"
))
.await
.unwrap()
.error_for_status()
.unwrap();
completed.find(&flow).await.unwrap();
}, port)
.await;
server.close().await.unwrap();
let result = completed_job(flow, &db).await.json_result().unwrap();
assert_eq!(
json!( {"error": {"name": "SuspendedDisapproved", "message": "Disapproved by unknown"}}),
result
);
Ok(())
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1120,3 +1120,4 @@ pub fn s3_mode_args_to_worker_data(
workspace_id: job.workspace_id.clone(),
}
}