Compare commits

...

24 Commits

Author SHA1 Message Date
Ruben Fiszel
fd25c4bf72 Merge branch 'main' into migration/queue-params 2024-11-02 19:35:19 +01:00
yacine Bouraroui
b031e2f48c Merge branch 'migration/queue-params' of https://github.com/yacineb/windmill into migration/queue-params 2024-10-04 13:35:33 +02:00
yacine Bouraroui
c95b59d5da "squash" migrations in a single one 2024-10-04 13:35:07 +02:00
Yacine
c1b779ebb5 Merge branch 'main' into migration/queue-params 2024-10-04 13:11:32 +02:00
yacine Bouraroui
7b31a900f6 Merge branch 'main' of https://github.com/yacineb/windmill into migration/queue-params 2024-10-04 13:05:44 +02:00
yacine Bouraroui
2f9cc1498d fix fetch_optional behaviour 2024-10-03 11:59:16 +02:00
yacine Bouraroui
6baa549cdb delete_job_metadata_after_use for job_args 2024-10-02 18:49:22 +02:00
yacine Bouraroui
4aa269f71f reduce boilerplate 2024-10-02 18:44:04 +02:00
yacine Bouraroui
7e5956e5a0 updated cargo.lock 2024-10-02 18:20:20 +02:00
yacine Bouraroui
7a6d404a11 trimmed some boilerplate 2024-10-02 18:19:52 +02:00
yacine Bouraroui
fd5d89b56a reduced columns queried from completed_job 2024-10-02 15:40:09 +02:00
yacine Bouraroui
2ad08d9194 missing dep 2024-10-02 15:18:11 +02:00
yacine Bouraroui
32428ed5f3 eliminate unncessary clones, memory alloc optimization 2024-10-02 15:11:56 +02:00
yacine Bouraroui
b20d83e58e fetch results from completed_job_result table 2024-10-02 14:50:31 +02:00
yacine Bouraroui
6300bb6e71 read results wip 2024-10-02 11:44:48 +02:00
yacine Bouraroui
f6d9cd062d write to completed_jobs_result on job completion 2024-10-02 10:28:01 +02:00
yacine Bouraroui
3e34bc7956 added completed_jobs_result table 2024-10-02 10:07:30 +02:00
yacine Bouraroui
045a52cded query args from job_args table first 2024-10-01 19:47:07 +02:00
yacine Bouraroui
f4df3affa3 separate job_args and job_params tables 2024-10-01 15:17:08 +02:00
yacine Bouraroui
698375cb17 added tag field, needed for postgres RLS 2024-10-01 12:41:19 +02:00
yacine Bouraroui
c7a327e1d8 resuable raw flow parsing logic. 2- enforced some queries to be compile-checked. some types "masking" and a better alternative to some 'select * from queue' 2024-10-01 12:30:11 +02:00
yacine Bouraroui
7d605e88b0 read ( args, raw_code, raw_flow) from separate table - wip 2024-09-30 18:25:29 +02:00
yacine Bouraroui
1ce19166ed create a separate params table for immutable jobs fields 2024-09-30 12:10:51 +02:00
yacine Bouraroui
a0542dc853 missing build folder in frontend breaks the build of windmill worker on first git clone 2024-09-30 10:59:09 +02:00
17 changed files with 499 additions and 284 deletions

1
backend/Cargo.lock generated
View File

@@ -10834,6 +10834,7 @@ dependencies = [
"bigdecimal",
"chrono",
"chrono-tz 0.10.0",
"const_format",
"cron",
"futures-core",
"hex",

View File

@@ -0,0 +1,3 @@
DROP TABLE job_params;
DROP TABLE job_args;
DROP TABLE completed_jobs_result;

View File

@@ -0,0 +1,25 @@
-- Add up migration script here
-- Add down migration script here
-- Add up migration script here
CREATE TABLE job_params (
id UUID PRIMARY KEY,
raw_code TEXT,
raw_flow jsonb NULL,
tag VARCHAR(50),
workspace_id VARCHAR(50)
);
-- Add up migration script here
CREATE TABLE job_args (
id UUID PRIMARY KEY,
args JSONB,
tag VARCHAR(50),
workspace_id VARCHAR(50)
);
CREATE TABLE completed_jobs_result (
id UUID PRIMARY KEY,
result JSONB,
tag VARCHAR(50),
workspace_id VARCHAR(50)
);

View File

@@ -19,6 +19,7 @@ use tokio::{
sync::{mpsc, RwLock},
};
use uuid::Uuid;
#[cfg(feature = "embedding")]
use windmill_api::embeddings::update_embeddings_db;
use windmill_api::{
@@ -32,7 +33,7 @@ use windmill_common::{
auth::JWT_SECRET,
ee::CriticalErrorChannel,
error,
flow_status::FlowStatusModule,
flow_status::{FlowStatusModule, ParsedFlowStatusGetter as _},
global_settings::{
BASE_URL_SETTING, BUNFIG_INSTALL_SCOPES_SETTING, CRITICAL_ERROR_CHANNELS_SETTING,
DEFAULT_TAGS_PER_WORKSPACE_SETTING, DEFAULT_TAGS_WORKSPACES_SETTING,
@@ -1420,7 +1421,7 @@ async fn handle_zombie_flows(
}
);
report_critical_error(reason.clone(), db.clone()).await;
cancel_zombie_flow_job(db, flow, &rsmq, reason).await?;
cancel_zombie_flow_job(db, &flow.id, &flow.workspace_id, &rsmq, reason).await?;
}
}
@@ -1436,11 +1437,17 @@ async fn handle_zombie_flows(
.fetch_all(db)
.await?;
#[derive(sqlx::FromRow, Debug)]
struct InQueueJobResult {
id: uuid::Uuid,
workspace_id: String,
}
for flow in flows2 {
let in_queue = sqlx::query_as::<_, QueuedJob>(
"SELECT * FROM queue WHERE id = $1 AND running = true AND canceled = false",
let in_queue = sqlx::query_as!(InQueueJobResult,
"SELECT id, workspace_id FROM queue WHERE id = $1 AND running = true AND canceled = false",
flow.parent_flow_id
)
.bind(flow.parent_flow_id)
.fetch_optional(db)
.await?;
if let Some(job) = in_queue {
@@ -1450,7 +1457,7 @@ async fn handle_zombie_flows(
job.workspace_id,
flow.last_ping
);
cancel_zombie_flow_job(db, job, &rsmq,
cancel_zombie_flow_job(db, &job.id, &job.workspace_id, &rsmq,
format!("Flow {} cancelled as one of the parallel branch {} was unable to make the last transition ", flow.parent_flow_id, flow.job_id))
.await?;
} else {
@@ -1462,21 +1469,22 @@ async fn handle_zombie_flows(
async fn cancel_zombie_flow_job(
db: &Pool<Postgres>,
flow: QueuedJob,
job_id: &Uuid,
workspace_id: &str,
rsmq: &Option<MultiplexedRsmq>,
message: String,
) -> Result<(), error::Error> {
let tx = db.begin().await.unwrap();
tracing::error!(
"zombie flow detected: {} in workspace {}. Cancelling it.",
flow.id,
flow.workspace_id
job_id,
workspace_id
);
let (ntx, _) = cancel_job(
"monitor",
Some(message),
flow.id,
flow.workspace_id.as_str(),
*job_id,
workspace_id,
tx,
db,
rsmq.clone(),

View File

@@ -21,11 +21,7 @@ use std::{
vec,
};
use windmill_common::{
db::UserDB,
error::JsonResult,
jobs::JobKind,
scripts::to_i64,
utils::{not_found_if_none, paginate, Pagination},
db::UserDB, error::JsonResult, jobs::JobKind, query_scalar_with_fallback, scripts::to_i64, utils::{not_found_if_none, paginate, Pagination}
};
pub fn workspaced_service() -> Router {
Router::new()
@@ -178,6 +174,7 @@ struct GetArgs {
input: Option<bool>,
allow_large: Option<bool>,
}
async fn get_args_from_history_or_saved_input(
authed: ApiAuthed,
Extension(user_db): Extension<UserDB>,
@@ -185,6 +182,7 @@ async fn get_args_from_history_or_saved_input(
Path((w_id, job_or_input_id)): Path<(String, Uuid)>,
) -> JsonResult<Option<Value>> {
let mut tx = user_db.begin(&authed).await?;
let result_o = if let Some(input) = g.input {
if input {
sqlx::query_scalar!(
@@ -196,24 +194,20 @@ async fn get_args_from_history_or_saved_input(
.fetch_optional(&mut *tx)
.await?
} else {
sqlx::query_scalar!(
query_scalar_with_fallback!(tx,
"SELECT CASE WHEN pg_column_size(args) < 40000 OR $3 THEN args ELSE '\"WINDMILL_TOO_BIG\"'::jsonb END as args FROM job_args WHERE id = $1 AND workspace_id = $2",
"SELECT CASE WHEN pg_column_size(args) < 40000 OR $3 THEN args ELSE '\"WINDMILL_TOO_BIG\"'::jsonb END as args FROM completed_job WHERE id = $1 AND workspace_id = $2",
job_or_input_id,
job_or_input_id,
w_id,
g.allow_large.unwrap_or(true)
)
.fetch_optional(&mut *tx)
.await?
g.allow_large.unwrap_or(true))?
}
} else {
sqlx::query_scalar!(
"SELECT CASE WHEN pg_column_size(args) < 40000 OR $3 THEN args ELSE '\"WINDMILL_TOO_BIG\"'::jsonb END as args FROM completed_job WHERE id = $1 AND workspace_id = $2 UNION ALL SELECT CASE WHEN pg_column_size(args) < 40000 OR $3 THEN args ELSE '\"WINDMILL_TOO_BIG\"'::jsonb END as args FROM input WHERE id = $1 AND workspace_id = $2",
job_or_input_id,
w_id,
g.allow_large.unwrap_or(true)
)
.fetch_optional(&mut *tx)
.await?
query_scalar_with_fallback!(tx,
"SELECT CASE WHEN pg_column_size(args) < 40000 OR $3 THEN args ELSE '\"WINDMILL_TOO_BIG\"'::jsonb END as args FROM job_args WHERE id = $1 AND workspace_id = $2 UNION ALL SELECT CASE WHEN pg_column_size(args) < 40000 OR $3 THEN args ELSE '\"WINDMILL_TOO_BIG\"'::jsonb END as args FROM input WHERE id = $1 AND workspace_id = $2",
"SELECT CASE WHEN pg_column_size(args) < 40000 OR $3 THEN args ELSE '\"WINDMILL_TOO_BIG\"'::jsonb END as args FROM completed_job WHERE id = $1 AND workspace_id = $2 UNION ALL SELECT CASE WHEN pg_column_size(args) < 40000 OR $3 THEN args ELSE '\"WINDMILL_TOO_BIG\"'::jsonb END as args FROM input WHERE id = $1 AND workspace_id = $2",
job_or_input_id,
w_id,
g.allow_large.unwrap_or(true))?
};
tx.commit().await?;

View File

@@ -314,9 +314,14 @@ async fn get_result_by_id(
Path((w_id, flow_id, node_id)): Path<(String, Uuid, String)>,
Query(JsonPath { json_path, .. }): Query<JsonPath>,
) -> windmill_common::error::JsonResult<Box<JsonRawValue>> {
let res =
windmill_queue::get_result_by_id(db.clone(), w_id.clone(), flow_id, node_id, json_path)
.await?;
let res = windmill_queue::get_result_by_id(
db.clone(),
&w_id,
flow_id,
&node_id,
json_path.as_deref(),
)
.await?;
log_job_view(&db, Some(&authed), &w_id, &flow_id).await?;
@@ -1349,6 +1354,8 @@ async fn cancel_jobs(
) -> error::JsonResult<Vec<Uuid>> {
let mut uuids = vec![];
let mut tx = db.begin().await?;
let result = serde_json::json!({"error": { "message": format!("Job canceled: cancel all by {username}"), "name": "Canceled", "reason": "cancel all", "canceler": username}});
let trivial_jobs = sqlx::query!("INSERT INTO completed_job AS cj
( workspace_id
, id
@@ -1412,10 +1419,19 @@ async fn cancel_jobs(
, tag
, priority FROM queue
WHERE id = any($2) AND running = false AND parent_job IS NULL AND workspace_id = $3 AND schedule_path IS NULL FOR UPDATE SKIP LOCKED
ON CONFLICT (id) DO NOTHING RETURNING id", username, &jobs, w_id, serde_json::json!({"error": { "message": format!("Job canceled: cancel all by {username}"), "name": "Canceled", "reason": "cancel all", "canceler": username}}))
ON CONFLICT (id) DO NOTHING RETURNING id", username, &jobs, w_id, &result)
.fetch_all(&mut *tx)
.await?.into_iter().map(|x| x.id).collect::<Vec<Uuid>>();
sqlx::query!(
"INSERT INTO completed_jobs_result(id, result, tag, workspace_id) SELECT id, $1, tag, $2 FROM completed_job WHERE id = any($3) ON CONFLICT (id) DO NOTHING",
result,
w_id,
&trivial_jobs,
)
.execute(&mut *tx)
.await?;
sqlx::query!(
"DELETE FROM queue WHERE id = any($1) AND workspace_id = $2",
&trivial_jobs,
@@ -1423,6 +1439,7 @@ async fn cancel_jobs(
)
.execute(&mut *tx)
.await?;
tx.commit().await?;
// sqlx::query!(
@@ -1436,18 +1453,10 @@ async fn cancel_jobs(
continue;
}
let rsmq = rsmq.clone();
match tokio::time::timeout(tokio::time::Duration::from_secs(5), async move {
if let Ok(result) = tokio::time::timeout(tokio::time::Duration::from_secs(5), async move {
let tx = db.begin().await?;
let (tx, _) = windmill_queue::cancel_job(
username,
None,
job_id.clone(),
w_id,
tx,
db,
rsmq,
false,
false,
username, None, job_id, w_id, tx, db, rsmq, false, false,
)
.await?;
tx.commit().await?;
@@ -1455,20 +1464,19 @@ async fn cancel_jobs(
})
.await
{
Ok(result) => match result {
match result {
Ok(_) => {
uuids.push(job_id);
}
Err(e) => {
tracing::error!("Failed to cancel job {:?}: {:?}", job_id, e);
}
},
Err(_) => {
tracing::error!(
"Timeout while trying to cancel job {:?} after 5 seconds",
job_id
);
}
} else {
tracing::error!(
"Timeout while trying to cancel job {:?} after 5 seconds",
job_id
);
}
}
@@ -3283,7 +3291,7 @@ async fn run_wait_result(
};
let fast_poll_duration = *WAIT_RESULT_FAST_POLL_DURATION_SECS as u64 * 1000;
let mut accumulated_delay = 0 as u64;
let mut accumulated_delay = 0_u64;
loop {
if let Some(node_id_for_empty_return) = node_id_for_empty_return.as_ref() {
@@ -3400,6 +3408,16 @@ async fn delete_job_metadata_after_use(db: &DB, job_uuid: Uuid) -> Result<(), Er
)
.execute(db)
.await?;
sqlx::query!(
"UPDATE job_args
SET args = '{}'::jsonb
WHERE id = $1",
job_uuid,
)
.execute(db)
.await?;
sqlx::query!(
"UPDATE job_logs
SET logs = '##DELETED##'
@@ -4384,11 +4402,8 @@ async fn add_batch_jobs(
tx = PushIsolationLevel::Transaction(ntx);
uuids.push(uuid);
}
match tx {
PushIsolationLevel::Transaction(tx) => {
tx.commit().await?;
}
_ => (),
if let PushIsolationLevel::Transaction(tx) = tx {
tx.commit().await?;
}
return Ok(Json(uuids));
}

View File

@@ -31,7 +31,6 @@ use windmill_audit::ActionKind;
use windmill_common::{
db::UserDB,
error::{Error, JsonResult, Result},
jobs::QueuedJob,
utils::{not_found_if_none, paginate, require_admin, Pagination, StripPath},
variables,
};
@@ -536,11 +535,29 @@ pub async fn transform_json_value<'c>(
}
Value::String(y) if y.starts_with("$") && job_id.is_some() => {
let mut tx = authed_transaction_or_default(authed, user_db.clone(), db).await?;
let job = sqlx::query_as::<_, QueuedJob>(
"SELECT * FROM queue WHERE id = $1 AND workspace_id = $2",
#[derive(sqlx::FromRow, Debug)]
struct QueuedJobLite {
pub id: Uuid,
pub workspace_id: String,
pub parent_job: Option<Uuid>,
pub created_by: String,
pub email: String,
pub permissioned_as: String,
pub script_path: Option<String>,
pub schedule_path: Option<String>,
pub root_job: Option<Uuid>,
pub flow_step_id: Option<String>,
pub scheduled_for: chrono::DateTime<chrono::Utc>,
}
let job = sqlx::query_as!(
QueuedJobLite,
"SELECT id, workspace_id,parent_job, created_by, email, permissioned_as, script_path, schedule_path, root_job, flow_step_id, scheduled_for
FROM queue WHERE id = $1 AND workspace_id = $2",
job_id.unwrap(),
workspace
)
.bind(job_id.unwrap())
.bind(workspace)
.fetch_optional(&mut *tx)
.await?;
tx.commit().await?;

View File

@@ -45,6 +45,33 @@ pub struct FlowStatus {
pub restarted_from: Option<RestartedFrom>,
}
pub trait FlowStatusGetter {
fn get_raw_flow_status(&self) -> Option<&sqlx::types::Json<Box<serde_json::value::RawValue>>>;
}
#[macro_export]
macro_rules! impl_flow_status_getter {
($struct_name:ident) => {
impl FlowStatusGetter for $struct_name {
fn get_raw_flow_status(
&self,
) -> Option<&sqlx::types::Json<Box<serde_json::value::RawValue>>> {
self.flow_status.as_ref()
}
}
};
}
pub trait ParsedFlowStatusGetter {
fn parse_flow_status(&self) -> Option<FlowStatus>;
}
impl<I: FlowStatusGetter> ParsedFlowStatusGetter for I {
fn parse_flow_status(&self) -> Option<FlowStatus> {
self.get_raw_flow_status()
.and_then(|v| serde_json::from_str::<FlowStatus>((**v).get()).ok())
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
#[serde(default)]
pub struct RetryStatus {

View File

@@ -125,6 +125,34 @@ pub struct FlowValue {
pub concurrency_key: Option<String>,
}
pub trait FlowValueGetter {
fn get_raw_flow_value(&self) -> Option<&sqlx::types::Json<Box<serde_json::value::RawValue>>>;
}
#[macro_export]
macro_rules! impl_flow_value_getter {
($struct_name:ident) => {
impl FlowValueGetter for $struct_name {
fn get_raw_flow_value(
&self,
) -> Option<&sqlx::types::Json<Box<serde_json::value::RawValue>>> {
self.raw_flow.as_ref()
}
}
};
}
pub trait ParsedFlowValueGetter {
fn parse_raw_flow(&self) -> Option<FlowValue>;
}
impl<I: FlowValueGetter> ParsedFlowValueGetter for I {
fn parse_raw_flow(&self) -> Option<FlowValue> {
self.get_raw_flow_value()
.and_then(|v| serde_json::from_str::<FlowValue>((**v).get()).ok())
}
}
#[derive(Deserialize, Serialize, Debug, Clone)]
pub struct StopAfterIf {
pub expr: String,

View File

@@ -15,9 +15,9 @@ pub const PREPROCESSOR_FAKE_ENTRYPOINT: &str = "__WM_PREPROCESSOR";
use crate::{
error::{self, to_anyhow, Error},
flow_status::{FlowStatus, RestartedFrom},
flows::{FlowValue, Retry},
get_latest_deployed_hash_for_path,
flow_status::{FlowStatusGetter, RestartedFrom},
flows::{FlowValue, FlowValueGetter, Retry},
get_latest_deployed_hash_for_path, impl_flow_status_getter, impl_flow_value_getter,
scripts::{ScriptHash, ScriptLang},
worker::{to_raw_value, TMP_DIR},
};
@@ -119,10 +119,7 @@ pub struct QueuedJob {
impl QueuedJob {
pub fn script_path(&self) -> &str {
self.script_path
.as_ref()
.map(String::as_str)
.unwrap_or("tmp/main")
self.script_path.as_deref().unwrap_or("tmp/main")
}
pub fn is_flow(&self) -> bool {
matches!(
@@ -139,22 +136,11 @@ impl QueuedJob {
self.script_path()
)
}
pub fn parse_raw_flow(&self) -> Option<FlowValue> {
self.raw_flow.as_ref().and_then(|v| {
let str = (**v).get();
// tracing::error!("raw_flow: {}", str);
return serde_json::from_str::<FlowValue>(str).ok();
})
}
pub fn parse_flow_status(&self) -> Option<FlowStatus> {
self.flow_status
.as_ref()
.and_then(|v| serde_json::from_str::<FlowStatus>((**v).get()).ok())
}
}
impl_flow_status_getter!(QueuedJob);
impl_flow_value_getter!(QueuedJob);
impl Default for QueuedJob {
fn default() -> Self {
Self {
@@ -266,23 +252,13 @@ impl CompletedJob {
pub fn json_result(&self) -> Option<serde_json::Value> {
self.result
.as_ref()
.map(|r| serde_json::from_str(r.get()).ok())
.flatten()
}
pub fn parse_raw_flow(&self) -> Option<FlowValue> {
self.raw_flow
.as_ref()
.and_then(|v| serde_json::from_str::<FlowValue>((**v).get()).ok())
}
pub fn parse_flow_status(&self) -> Option<FlowStatus> {
self.flow_status
.as_ref()
.and_then(|v| serde_json::from_str::<FlowStatus>((**v).get()).ok())
.and_then(|r| serde_json::from_str(r.get()).ok())
}
}
impl_flow_status_getter!(CompletedJob);
impl_flow_value_getter!(CompletedJob);
#[derive(sqlx::FromRow)]
pub struct BranchResults {
pub result: sqlx::types::Json<Box<RawValue>>,

View File

@@ -6,6 +6,7 @@
* LICENSE-AGPL for a copy of the license.
*/
pub mod macros;
use std::{
net::SocketAddr,
sync::{atomic::AtomicBool, Arc},

View File

@@ -0,0 +1,57 @@
#[macro_export]
macro_rules! fetch_one_with_fallback {
($db:expr, $query_method:ident, $row_type:ty, $query:literal, $table:literal || $fallback_table:literal, $( $param:expr ),* ) => {{
let primary_query = sqlx::$query_method::<_, $row_type>(const_format::formatcp!($query, $table))
$(.bind($param))*
.fetch_one($db)
.await;
if let Err(sqlx::Error::RowNotFound) = primary_query {
tracing::info!("Data not found in job_params, falling back to fetching from $fallback_table");
sqlx::$query_method::<_, $row_type>(const_format::formatcp!($query, $fallback_table))
$(.bind($param))*
.fetch_one($db)
.await
} else {
primary_query
}
}};
}
#[macro_export]
macro_rules! fetch_optional_with_fallback {
($db:expr, $query_method:ident, $row_type:ty, $query:literal, $table:literal || $fallback_table:literal, $( $param:expr ),* ) => {{
let primary_query = sqlx::$query_method::<_, $row_type>(const_format::formatcp!($query, $table))
$(.bind($param))*
.fetch_optional($db)
.await;
if let Ok(None) = primary_query {
tracing::info!("Data not found in job_params, falling back to fetching from $fallback_table");
sqlx::$query_method::<_, $row_type>(const_format::formatcp!($query, $fallback_table))
$(.bind($param))*
.fetch_optional($db)
.await
} else {
primary_query
}
}};
}
#[macro_export]
macro_rules! query_scalar_with_fallback {
($tx:expr, $query:literal, $fallback_query:literal,$( $param:expr ),* ) => {{
let primary_query = sqlx::query_scalar!($query,$($param),*)
.fetch_optional(&mut *$tx)
.await;
if let Ok(None) = primary_query {
tracing::info!("Data not found in job_args, falling back to fetching from queue");
sqlx::query_scalar!($fallback_query, $($param),*)
.fetch_optional(&mut *$tx)
.await
} else {
primary_query
}
}};
}

View File

@@ -91,7 +91,7 @@ lazy_static::lazy_static! {
}
pub async fn make_suspended_pull_query(wc: &WorkerConfig) {
if wc.worker_tags.len() == 0 {
if wc.worker_tags.is_empty() {
tracing::error!("Empty tags in worker tags, skipping");
return;
}
@@ -123,7 +123,7 @@ pub async fn make_suspended_pull_query(wc: &WorkerConfig) {
pub async fn make_pull_query(wc: &WorkerConfig) {
let mut queries = vec![];
for tags in wc.priority_tags_sorted.iter() {
if tags.tags.len() == 0 {
if tags.tags.is_empty() {
tracing::error!("Empty tags in priority tags, skipping");
continue;
}

View File

@@ -42,4 +42,5 @@ async-recursion.workspace = true
bigdecimal.workspace = true
axum.workspace = true
serde_urlencoded.workspace = true
regex.workspace = true
regex.workspace = true
const_format.workspace = true

View File

@@ -6,8 +6,6 @@
* LICENSE-AGPL for a copy of the license.
*/
use std::{borrow::Borrow, collections::HashMap, sync::Arc, vec};
use anyhow::Context;
use async_recursion::async_recursion;
use axum::{
@@ -31,12 +29,14 @@ use serde_json::{json, value::RawValue};
use sqlx::{types::Json, FromRow, Pool, Postgres, Transaction};
#[cfg(feature = "benchmark")]
use std::time::Instant;
use std::{borrow::Borrow, collections::HashMap, sync::Arc, vec};
use tokio::{sync::RwLock, time::sleep};
use tracing::{instrument, Instrument};
use ulid::Ulid;
use uuid::Uuid;
use windmill_audit::audit_ee::{audit_log, AuditAuthor};
use windmill_audit::ActionKind;
use windmill_common::{fetch_optional_with_fallback, flows::FlowValueGetter};
use windmill_common::{
add_time,
@@ -44,12 +44,15 @@ use windmill_common::{
db::{Authed, UserDB},
error::{self, to_anyhow, Error},
flow_status::{
BranchAllStatus, FlowCleanupModule, FlowStatus, FlowStatusModule, FlowStatusModuleWParent,
Iterator, JobResult, RestartedFrom, RetryStatus, MAX_RETRY_ATTEMPTS, MAX_RETRY_INTERVAL,
BranchAllStatus, FlowCleanupModule, FlowStatus, FlowStatusGetter, FlowStatusModule,
FlowStatusModuleWParent, Iterator, JobResult, ParsedFlowStatusGetter, RestartedFrom,
RetryStatus, MAX_RETRY_ATTEMPTS, MAX_RETRY_INTERVAL,
},
flows::{
add_virtual_items_if_necessary, FlowModule, FlowModuleValue, FlowValue, InputTransform,
ParsedFlowValueGetter,
},
impl_flow_status_getter, impl_flow_value_getter,
jobs::{
get_payload_tag_from_prefixed_path, CompletedJob, JobKind, JobPayload, QueuedJob, RawCode,
ENTRYPOINT_OVERRIDE, PREPROCESSOR_FAKE_ENTRYPOINT,
@@ -142,6 +145,14 @@ pub struct CanceledBy {
pub reason: Option<String>,
}
#[derive(Debug, sqlx::FromRow)]
pub struct CompletedSubFlow {
pub id: Uuid,
pub flow_status: Option<sqlx::types::Json<Box<RawValue>>>,
}
impl_flow_status_getter!(CompletedSubFlow);
pub async fn cancel_single_job<'c>(
username: &str,
reason: Option<String>,
@@ -222,26 +233,23 @@ pub async fn cancel_job<'c>(
force_cancel: bool,
require_anonymous: bool,
) -> error::Result<(Transaction<'c, Postgres>, Option<Uuid>)> {
let job = get_queued_job_tx(id, &w_id, &mut tx).await?;
if job.is_none() {
let Some(mut job) = get_queued_job_tx(id, w_id, &mut tx).await? else {
return Ok((tx, None));
}
};
if require_anonymous && job.as_ref().unwrap().created_by != "anonymous" {
if require_anonymous && job.created_by != "anonymous" {
return Err(Error::BadRequest(
"You are not logged in and this job was not created by an anonymous user like you so you cannot cancel it".to_string(),
));
}
let mut job = job.unwrap();
if force_cancel {
// if force canceling a flow step, make sure we force cancel from the highest parent
loop {
if job.parent_job.is_none() {
break;
}
match get_queued_job_tx(job.parent_job.unwrap(), &w_id, &mut tx).await? {
match get_queued_job_tx(job.parent_job.unwrap(), w_id, &mut tx).await? {
Some(j) => {
job = j;
}
@@ -587,6 +595,8 @@ pub async fn add_completed_job<
let mem_peak = mem_peak.max(queued_job.mem_peak.unwrap_or(0));
add_time!(bench, "add_completed_job query START");
// On conflict (when id already exists), update the success and result fields.
let _duration: i64 = sqlx::query_scalar!(
"INSERT INTO completed_job AS cj
( workspace_id
@@ -623,8 +633,8 @@ pub async fn add_completed_job<
VALUES ($1, $2, $3, $4, $5, COALESCE($6, now()), (EXTRACT('epoch' FROM (now())) - EXTRACT('epoch' FROM (COALESCE($6, now()))))*1000, $7, $8, $9,\
$10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29)
ON CONFLICT (id) DO UPDATE SET success = $7, result = $11 RETURNING duration_ms",
queued_job.workspace_id,
queued_job.id,
&queued_job.workspace_id,
&queued_job.id,
queued_job.parent_job,
queued_job.created_by,
queued_job.created_at,
@@ -633,7 +643,7 @@ pub async fn add_completed_job<
queued_job.script_hash.map(|x| x.0),
queued_job.script_path,
&queued_job.args as &Option<Json<HashMap<String, Box<RawValue>>>>,
result as Json<&T>,
&result as &Json<&T>,
queued_job.raw_code,
queued_job.raw_lock,
canceled_by.is_some(),
@@ -650,16 +660,32 @@ pub async fn add_completed_job<
queued_job.email,
queued_job.visible_to_owner,
if mem_peak > 0 { Some(mem_peak) } else { None },
queued_job.tag,
&queued_job.tag,
queued_job.priority,
)
.fetch_one(&mut tx)
.await
.map_err(|e| Error::InternalErr(format!("Could not add completed job {job_id}: {e:#}")))?;
// tracing::error!("2 {:?}", start.elapsed());
add_time!(bench, "add_completed_job query END");
add_time!(bench, "completed_jobs_result query START");
sqlx::query!(
"INSERT INTO completed_jobs_result(id, result, tag, workspace_id) VALUES($1, $2, $3, $4) ON CONFLICT (id) DO UPDATE SET result = $2",
queued_job.id,
&result as &Json<&T>,
queued_job.tag,
queued_job.workspace_id,
)
.execute(&mut tx)
.await
.map_err(|e| {
Error::InternalErr(format!(
"Could not add completed job result {job_id}: {e:#}"
))
})?;
add_time!(bench, "completed_jobs_result query END");
if !queued_job.is_flow_step {
if _duration > 500
&& (queued_job.job_kind == JobKind::Script || queued_job.job_kind == JobKind::Preview)
@@ -864,10 +890,10 @@ pub async fn add_completed_job<
tx.commit().await?;
tracing::info!(
%job_id,
root_job = ?queued_job.root_job.map(|x| x.to_string()).unwrap_or_else(|| String::new()),
root_job = ?queued_job.root_job.map(|x| x.to_string()).unwrap_or_default(),
path = &queued_job.script_path(),
job_kind = ?queued_job.job_kind,
started_at = ?queued_job.started_at.map(|x| x.to_string()).unwrap_or_else(|| String::new()),
started_at = ?queued_job.started_at.map(|x| x.to_string()).unwrap_or_default(),
duration = ?_duration,
permissioned_as = ?queued_job.permissioned_as,
email = ?queued_job.email,
@@ -2267,55 +2293,48 @@ pub struct ResultWithId {
pub async fn get_result_by_id(
db: Pool<Postgres>,
w_id: String,
w_id: &str,
flow_id: Uuid,
node_id: String,
json_path: Option<String>,
node_id: &str,
json_path: Option<&str>,
) -> error::Result<Box<RawValue>> {
match get_result_by_id_from_running_flow(
&db,
w_id.as_str(),
&flow_id,
node_id.as_str(),
json_path.clone(),
)
.await
{
#[derive(sqlx::FromRow, Debug)]
struct RunningFlowJobResult {
pub id: Uuid,
pub flow_status: Option<Json<Box<RawValue>>>,
}
impl_flow_status_getter!(RunningFlowJobResult);
match get_result_by_id_from_running_flow(&db, w_id, &flow_id, node_id, json_path).await {
Ok(res) => Ok(res),
Err(_) => {
let running_flow_job =sqlx::query_as::<_, QueuedJob>(
"SELECT * FROM queue WHERE COALESCE((SELECT root_job FROM queue WHERE id = $1), $1) = id AND workspace_id = $2"
let running_flow_job =sqlx::query_as::<_, RunningFlowJobResult>(
"SELECT id, flow_status FROM queue WHERE COALESCE((SELECT root_job FROM queue WHERE id = $1), $1) = id AND workspace_id = $2"
).bind(flow_id)
.bind(&w_id)
.fetch_optional(&db).await?;
match running_flow_job {
Some(job) => {
let restarted_from = windmill_common::utils::not_found_if_none(
job.parse_flow_status()
.map(|status| status.restarted_from)
.flatten(),
.and_then(|status| status.restarted_from),
"Id not found in the result's mapping of the root job and root job had no restarted from information",
format!("parent: {}, root: {}, id: {}", flow_id, job.id, node_id),
)?;
get_result_by_id_from_original_flow(
&db,
w_id.as_str(),
w_id,
&restarted_from.flow_job_id,
node_id.as_str(),
json_path.clone(),
node_id,
json_path,
)
.await
}
None => {
get_result_by_id_from_original_flow(
&db,
w_id.as_str(),
&flow_id,
node_id.as_str(),
json_path.clone(),
)
.await
get_result_by_id_from_original_flow(&db, w_id, &flow_id, node_id, json_path)
.await
}
}
}
@@ -2334,7 +2353,7 @@ pub async fn get_result_by_id_from_running_flow(
w_id: &str,
flow_id: &Uuid,
node_id: &str,
json_path: Option<String>,
json_path: Option<&str>,
) -> error::Result<Box<RawValue>> {
let flow_job_result = sqlx::query_as::<_, FlowJobResult>(
"SELECT leaf_jobs->$1::text as leaf_jobs, parent_job FROM queue WHERE COALESCE((SELECT root_job FROM queue WHERE id = $2), $2) = id AND workspace_id = $3")
@@ -2352,8 +2371,7 @@ pub async fn get_result_by_id_from_running_flow(
let job_result = flow_job_result
.leaf_jobs
.map(|x| serde_json::from_str(x.get()).ok())
.flatten();
.and_then(|x| serde_json::from_str(x.get()).ok());
if job_result.is_none() && flow_job_result.parent_job.is_some() {
let parent_job = flow_job_result.parent_job.unwrap();
@@ -2378,9 +2396,9 @@ pub async fn get_result_by_id_from_running_flow(
async fn get_completed_flow_node_result_rec(
db: &Pool<Postgres>,
w_id: &str,
subflows: Vec<CompletedJob>,
subflows: &[CompletedSubFlow],
node_id: &str,
json_path: Option<String>,
json_path: Option<&str>,
) -> error::Result<Option<Box<RawValue>>> {
for subflow in subflows {
let flow_status = subflow.parse_flow_status().ok_or_else(|| {
@@ -2397,7 +2415,7 @@ async fn get_completed_flow_node_result_rec(
db,
w_id,
JobResult::SingleJob(leaf_job_uuid),
json_path.clone(),
json_path,
)
.await
.map(Some),
@@ -2405,7 +2423,7 @@ async fn get_completed_flow_node_result_rec(
db,
w_id,
JobResult::ListJob(jobs),
json_path.clone(),
json_path,
)
.await
.map(Some),
@@ -2416,10 +2434,11 @@ async fn get_completed_flow_node_result_rec(
))),
};
} else {
let subflows = sqlx::query_as::<_, CompletedJob>(
"SELECT *, null as labels FROM completed_job WHERE parent_job = $1 AND workspace_id = $2 AND flow_status IS NOT NULL",
let subflows = sqlx::query_as::<_, CompletedSubFlow>(
"SELECT id, flow_status as labels FROM completed_job WHERE parent_job = $1 AND workspace_id = $2 AND flow_status IS NOT NULL",
).bind(subflow.id).bind(w_id).fetch_all(db).await?;
match get_completed_flow_node_result_rec(db, w_id, subflows, node_id, json_path.clone())
match get_completed_flow_node_result_rec(db, w_id, &subflows, node_id, json_path)
.await?
{
Some(res) => return Ok(Some(res)),
@@ -2436,10 +2455,10 @@ async fn get_result_by_id_from_original_flow(
w_id: &str,
completed_flow_id: &Uuid,
node_id: &str,
json_path: Option<String>,
json_path: Option<&str>,
) -> error::Result<Box<RawValue>> {
let flow_job = sqlx::query_as::<_, CompletedJob>(
"SELECT *, null as labels FROM completed_job WHERE id = $1 AND workspace_id = $2",
let flow_job = sqlx::query_as::<_, CompletedSubFlow>(
"SELECT id, flow_status FROM completed_job WHERE id = $1 AND workspace_id = $2",
)
.bind(completed_flow_id)
.bind(w_id)
@@ -2452,7 +2471,7 @@ async fn get_result_by_id_from_original_flow(
format!("root: {}, id: {}", completed_flow_id, node_id),
)?;
match get_completed_flow_node_result_rec(db, w_id, vec![flow_job], node_id, json_path).await? {
match get_completed_flow_node_result_rec(db, w_id, &[flow_job], node_id, json_path).await? {
Some(res) => Ok(res),
None => Err(error::Error::NotFound(format!(
"Flow result by id not found going top-down from {}, (id: {})",
@@ -2465,31 +2484,33 @@ async fn extract_result_from_job_result(
db: &Pool<Postgres>,
w_id: &str,
job_result: JobResult,
json_path: Option<String>,
json_path: Option<&str>,
) -> error::Result<Box<RawValue>> {
match job_result {
JobResult::ListJob(job_ids) => match json_path {
Some(json_path) => {
let mut parts = json_path.split(".");
let mut parts = json_path.split('.');
let Some(idx) = parts.next().map(|x| x.parse::<usize>().ok()).flatten() else {
let Some(ref idx) = parts.next().and_then(|x| x.parse::<usize>().ok()) else {
return Ok(to_raw_value(&serde_json::Value::Null));
};
let Some(job_id) = job_ids.get(idx).cloned() else {
let Some(job_id) = job_ids.get(*idx) else {
return Ok(to_raw_value(&serde_json::Value::Null));
};
Ok(sqlx::query_as::<_, ResultR>(
"SELECT result #> $3 as result FROM completed_job WHERE id = $1 AND workspace_id = $2",
)
.bind(job_id)
.bind(w_id)
.bind(
parts.map(|x| x.to_string()).collect::<Vec<_>>()
)
.fetch_optional(db)
.await?
.map(|r| r.result.map(|x| x.0))
.flatten()
let parts = parts.map(|x| x.to_string()).collect_vec();
Ok(fetch_optional_with_fallback!(
db,
query_as,
ResultR,
"SELECT result #> $3 as result FROM {} WHERE id = $1 AND workspace_id = $2",
"completed_jobs_result" || "completed_job",
*job_id,
w_id,
&parts
)?
.and_then(|r| r.result.map(|x| x.0))
.unwrap_or_else(|| to_raw_value(&serde_json::Value::Null)))
}
None => {
@@ -2503,6 +2524,7 @@ async fn extract_result_from_job_result(
.into_iter()
.filter_map(|x| x.result.map(|y| (x.id, y)))
.collect::<HashMap<Uuid, Json<Box<RawValue>>>>();
let result = job_ids
.into_iter()
.map(|id| {
@@ -2510,25 +2532,31 @@ async fn extract_result_from_job_result(
.map(|x| x.0.clone())
.unwrap_or_else(|| to_raw_value(&serde_json::Value::Null))
})
.collect::<Vec<_>>();
.collect_vec();
Ok(to_raw_value(&result))
}
},
JobResult::SingleJob(x) => Ok(sqlx::query_as::<_, ResultR>(
"SELECT result #> $3 as result FROM completed_job WHERE id = $1 AND workspace_id = $2",
)
.bind(x)
.bind(w_id)
.bind(
json_path
// ici
JobResult::SingleJob(x) => {
let path = json_path
.map(|x| x.split(".").map(|x| x.to_string()).collect::<Vec<_>>())
.unwrap_or_default(),
)
.fetch_optional(db)
.await?
.map(|r| r.result.map(|x| x.0))
.flatten()
.unwrap_or_else(|| to_raw_value(&serde_json::Value::Null))),
.unwrap_or_default();
let res = fetch_optional_with_fallback!(
db,
query_as,
ResultR,
"SELECT result #> $3 as result FROM {} WHERE id = $1 AND workspace_id = $2",
"completed_jobs_result" || "completed_job",
x,
w_id,
&path
)?
.and_then(|r| r.result.map(|x| x.0))
.unwrap_or_else(|| to_raw_value(&serde_json::Value::Null));
Ok(res)
}
}
}
@@ -2643,7 +2671,7 @@ pub struct PushArgsOwned {
pub args: HashMap<String, Box<RawValue>>,
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct PushArgs<'c> {
pub extra: Option<HashMap<String, Box<RawValue>>>,
pub args: &'c HashMap<String, Box<RawValue>>,
@@ -2754,7 +2782,7 @@ fn restructure_cloudevents_metadata(
.unwrap_or_else(|| to_raw_value(&serde_json::Value::Null));
let str = data.to_string();
let wrap_body = str.len() > 0 && str.chars().next().unwrap() != '{';
let wrap_body = !str.is_empty() && !str.starts_with('{');
if wrap_body {
let args = serde_json::from_str::<Option<Box<RawValue>>>(&str)
@@ -2784,7 +2812,7 @@ impl PushArgsOwned {
extra.insert("raw_string".to_string(), to_raw_value(&str));
}
let wrap_body = force_wrap_body || str.len() > 0 && str.chars().next().unwrap() != '{';
let wrap_body = force_wrap_body || !str.is_empty() && !str.starts_with('{');
if wrap_body {
let args = serde_json::from_str::<Option<Box<RawValue>>>(&str)
@@ -3895,6 +3923,10 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>(
};
tracing::debug!("Pushing job {job_id} with tag {tag}, schedule_path {schedule_path:?}, script_path: {script_path:?}, email {email}, workspace_id {workspace_id}");
let raw_flow = raw_flow.map(Json);
let args = Json(args);
let uuid = sqlx::query_scalar!(
"INSERT INTO queue
(workspace_id, id, running, parent_job, created_by, permissioned_as, scheduled_for,
@@ -3913,12 +3945,12 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>(
scheduled_for_o,
script_hash,
script_path.clone(),
raw_code,
raw_code.clone(),
raw_lock,
Json(args) as Json<PushArgs>,
args.clone() as Json<PushArgs>,
job_kind.clone() as JobKind,
schedule_path,
raw_flow.map(Json) as Option<Json<FlowValue>>,
raw_flow.clone() as Option<Json<FlowValue>>,
flow_status.map(Json) as Option<Json<FlowStatus>>,
is_flow_step,
language as Option<ScriptLang>,
@@ -3927,7 +3959,7 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>(
email,
visible_to_owner,
root_job,
tag,
tag.clone(),
concurrent_limit,
if concurrent_limit.is_some() { concurrency_time_window_s } else { None },
custom_timeout,
@@ -3939,6 +3971,34 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>(
.await
.map_err(|e| Error::InternalErr(format!("Could not insert into queue {job_id} with tag {tag}, schedule_path {schedule_path:?}, script_path: {script_path:?}, email {email}, workspace_id {workspace_id}: {e:#}")))?;
// insert into args queue
sqlx::query!(
r#"
INSERT INTO job_args (id, workspace_id, args, tag)
VALUES ($1, $2, $3, $4)
"#,
uuid,
workspace_id,
args as Json<PushArgs>,
&tag
)
.execute(&mut tx)
.await?;
sqlx::query!(
r#"
INSERT INTO job_params (id, workspace_id, raw_code, raw_flow, tag)
VALUES ($1, $2, $3, $4, $5)
"#,
uuid,
workspace_id,
raw_code,
raw_flow as Option<Json<FlowValue>>,
tag
)
.execute(&mut tx)
.await?;
tracing::debug!("Pushed {job_id}");
// TODO: technically the job isn't queued yet, as the transaction can be rolled back. Should be solved when moving these metrics to the queue abstraction.
#[cfg(feature = "prometheus")]
@@ -4059,11 +4119,8 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>(
}
pub fn canceled_job_to_result(job: &QueuedJob) -> serde_json::Value {
let reason = job
.canceled_reason
.as_deref()
.unwrap_or_else(|| "no reason given");
let canceler = job.canceled_by.as_deref().unwrap_or_else(|| "unknown");
let reason = job.canceled_reason.as_deref().unwrap_or("no reason given");
let canceler = job.canceled_by.as_deref().unwrap_or("unknown");
serde_json::json!({"message": format!("Job canceled: {reason} by {canceler}"), "name": "Canceled", "reason": reason, "canceler": canceler})
}
@@ -4086,8 +4143,19 @@ async fn restarted_flows_resolution(
),
Error,
> {
let completed_job = sqlx::query_as::<_, CompletedJob>(
"SELECT *, null as labels FROM completed_job WHERE id = $1 and workspace_id = $2",
#[derive(Debug, sqlx::FromRow)]
struct QueryResults {
pub script_path: Option<String>,
pub priority: Option<i16>,
pub raw_flow: Option<sqlx::types::Json<Box<RawValue>>>,
pub flow_status: Option<sqlx::types::Json<Box<RawValue>>>,
}
impl_flow_status_getter!(QueryResults);
impl_flow_value_getter!(QueryResults);
let completed_job = sqlx::query_as::<_, QueryResults>(
"SELECT script_path, priority, raw_flow, flow_status FROM completed_job WHERE id = $1 and workspace_id = $2",
)
.bind(completed_flow_id)
.bind(workspace_id)
@@ -4120,10 +4188,9 @@ async fn restarted_flows_resolution(
if flow_value_if_any
.clone()
.map(|fv| {
fv.modules
!fv.modules
.iter()
.find(|flow_value_module| flow_value_module.id == module.id())
.is_none()
.any(|flow_value_module| flow_value_module.id == module.id())
})
.unwrap_or(false)
{
@@ -4239,7 +4306,7 @@ async fn restarted_flows_resolution(
truncated_modules.push(FlowStatusModule::WaitingForPriorSteps { id: module.id() });
} else {
// else we simply "transfer" the module from the completed flow to the new one if it's a success
step_n = step_n + 1;
step_n += 1;
match module.clone() {
FlowStatusModule::Success { .. } => Ok(truncated_modules.push(module)),
_ => Err(Error::InternalErr(format!(

View File

@@ -29,20 +29,20 @@ use sqlx::FromRow;
use tokio::sync::mpsc::Sender;
use tracing::instrument;
use uuid::Uuid;
use windmill_common::add_time;
use windmill_common::auth::JobPerms;
#[cfg(feature = "benchmark")]
use windmill_common::bench::BenchmarkIter;
use windmill_common::db::Authed;
use windmill_common::flow_status::{
ApprovalConditions, FlowStatusModuleWParent, Iterator, JobResult,
ApprovalConditions, FlowStatusModuleWParent, Iterator, JobResult, ParsedFlowStatusGetter,
};
use windmill_common::flows::add_virtual_items_if_necessary;
use windmill_common::flows::{add_virtual_items_if_necessary, ParsedFlowValueGetter};
use windmill_common::jobs::{
script_hash_to_tag_and_limits, script_path_to_payload, BranchResults, JobPayload, QueuedJob,
RawCode, ENTRYPOINT_OVERRIDE,
};
use windmill_common::worker::to_raw_value;
use windmill_common::{add_time, fetch_one_with_fallback, fetch_optional_with_fallback};
use windmill_common::{
error::{self, to_anyhow, Error},
flow_status::{
@@ -61,6 +61,19 @@ type DB = sqlx::Pool<sqlx::Postgres>;
use windmill_queue::{canceled_job_to_result, get_queued_job_tx, push, QueueTransaction};
async fn get_args_from_job_id(db: &DB, id: &Uuid) -> Result<RowArgs, Error> {
let args = fetch_one_with_fallback!(
db,
query_as,
RowArgs,
"SELECT args FROM {} WHERE id = $1",
"job_args" || "queue",
id
);
args.map_err(|e| Error::InternalErr(format!("retrieval of args from state: {e:#}")))
}
// #[instrument(level = "trace", skip_all)]
pub async fn update_flow_status_after_job_completion<
R: rsmq_async::RsmqConnection + Send + Sync + Clone,
@@ -229,10 +242,10 @@ pub async fn update_flow_status_after_job_completion_internal<
})?;
let old_status = serde_json::from_str::<FlowStatus>(old_status_json.flow_status.get())
.or_else(|e| {
Err(Error::InternalErr(format!(
.map_err(|e| {
Error::InternalErr(format!(
"requiring status to be parsable as FlowStatus: {e:?}"
)))
))
})?;
let current_module = if let Some(x) = old_status_json.current_module {
@@ -289,7 +302,7 @@ pub async fn update_flow_status_after_job_completion_internal<
// 0 length flows are not failure steps
let is_failure_step =
old_status.step >= old_status.modules.len() as i32 && old_status.modules.len() > 0;
old_status.step >= old_status.modules.len() as i32 && !old_status.modules.is_empty();
let (mut stop_early, mut skip_if_stop_early, continue_on_error) = if let Some(se) =
stop_early_override
@@ -302,14 +315,13 @@ pub async fn update_flow_status_after_job_completion_internal<
};
let is_flow = if let Some(step) = step {
sqlx::query_scalar!(
"SELECT raw_flow->'modules'->($1)->'value'->>'type' = 'flow' FROM queue WHERE id = $2",
fetch_one_with_fallback!(db,
query_scalar,
Option<bool>,
"SELECT raw_flow->'modules'->($1)->'value'->>'type' = 'flow' FROM {} WHERE id = $2",
"job_params" || "queue",
step as i32,
&flow
)
.fetch_one(db)
.await
.map_err(|e| {
&flow).map_err(|e| {
Error::InternalErr(format!("error during retrieval of step's type: {e:#}"))
})?
.unwrap_or(false)
@@ -342,19 +354,9 @@ pub async fn update_flow_status_after_job_completion_internal<
}
_ => None,
};
let args = sqlx::query_as::<_, RowArgs>(
"SELECT
args
FROM queue
WHERE id = $2",
)
.bind(old_status.step)
.bind(flow)
.fetch_one(db)
.await
.map_err(|e| {
Error::InternalErr(format!("retrieval of args from state: {e:#}"))
})?;
let args = get_args_from_job_id(db, &flow).await?;
compute_bool_from_expr(
expr.to_string(),
Marc::new(args.args.unwrap_or_default().0),
@@ -442,7 +444,7 @@ pub async fn update_flow_status_after_job_completion_internal<
None
};
let nindex = if let Some(position) = position {
let nindex = if let Some(position) = position {
sqlx::query_scalar!(
"UPDATE queue
SET flow_status = JSONB_SET(
@@ -489,7 +491,7 @@ pub async fn update_flow_status_after_job_completion_internal<
None
};
let nindex = if let Some(position) = position {
let nindex = if let Some(position) = position {
sqlx::query_scalar!(
"UPDATE queue
SET flow_status = JSONB_SET(
@@ -658,7 +660,7 @@ pub async fn update_flow_status_after_job_completion_internal<
flow_jobs_success,
flow_jobs,
..
} if branch.to_owned() < len - 1 && (success || skip_branch_failure) => {
} if *branch < len - 1 && (success || skip_branch_failure) => {
if let Some(jobs) = flow_jobs {
set_success_in_flow_job_success(
flow_jobs_success,
@@ -870,19 +872,7 @@ pub async fn update_flow_status_after_job_completion_internal<
.as_ref()
.and_then(|m| m.stop_after_all_iters_if.as_ref().map(|x| x.expr.clone()))
{
let args = sqlx::query_as::<_, RowArgs>(
"SELECT
args
FROM queue
WHERE id = $2",
)
.bind(old_status.step)
.bind(flow)
.fetch_one(db)
.await
.map_err(|e| {
Error::InternalErr(format!("retrieval of args from state: {e:#}"))
})?;
let args = get_args_from_job_id(db, &flow).await?;
let should_stop = compute_bool_from_expr(
expr.to_string(),
@@ -1244,7 +1234,7 @@ fn get_module(flow_job: &QueuedJob, module_step: &Step) -> Option<FlowModule> {
if let Some(raw_flow) = raw_flow {
match module_step {
Step::PreprocessorStep => raw_flow.preprocessor_module.map(|x| *x.clone()),
Step::Step(i) => raw_flow.modules.get(*i).map(|x| x.clone()),
Step::Step(i) => raw_flow.modules.get(*i).cloned(),
Step::FailureStep => raw_flow.failure_module.map(|x| *x.clone()),
}
} else {
@@ -1269,8 +1259,7 @@ async fn compute_skip_branchall_failure<'c>(
.map(|p| {
BRANCHALL_INDEX_RE
.captures(&p)
.map(|x| x.get(1).unwrap().as_str().parse::<i32>().ok())
.flatten()
.and_then(|x| x.get(1).unwrap().as_str().parse::<i32>().ok())
.ok_or(Error::InternalErr(format!(
"could not parse branchall index from path: {p}"
)))
@@ -1291,14 +1280,14 @@ async fn compute_skip_branchall_failure<'c>(
}
async fn has_failure_module<'c>(flow: Uuid, db: &DB) -> Result<bool, Error> {
sqlx::query_scalar::<_, Option<bool>>(
"SELECT raw_flow->'failure_module' != 'null'::jsonb
FROM queue
WHERE id = $1",
fetch_one_with_fallback!(
db,
query_scalar,
Option<bool>,
"SELECT raw_flow->'failure_module' != 'null'::jsonb FROM {} WHERE id = $1",
"job_params" || "queue",
flow
)
.bind(flow)
.fetch_one(db)
.await
.map_err(|e| {
Error::InternalErr(format!(
"error during retrieval of has_failure_module: {e:#}"
@@ -1610,6 +1599,7 @@ pub struct ResumeRow {
#[derive(FromRow)]
pub struct RawArgs {
#[allow(dead_code)]
pub args: Option<Json<HashMap<String, Box<RawValue>>>>,
}
@@ -1638,6 +1628,7 @@ fn potentially_crash_for_testing() {
lazy_static::lazy_static! {
pub static ref EHM: HashMap<String, Box<RawValue>> = HashMap::new();
}
// #[async_recursion]
// #[instrument(level = "trace", skip_all)]
async fn push_next_flow_job<R: rsmq_async::RsmqConnection + Send + Sync + Clone>(
@@ -1725,7 +1716,7 @@ async fn push_next_flow_job<R: rsmq_async::RsmqConnection + Send + Sync + Clone>
flow_job.workspace_id.as_str(),
flow_job.id
).fetch_all(db).await?;
if overlapping.len() > 0 {
if !overlapping.is_empty() {
let overlapping_str = overlapping
.iter()
.map(|x| x.to_string())
@@ -1804,8 +1795,8 @@ async fn push_next_flow_job<R: rsmq_async::RsmqConnection + Send + Sync + Clone>
// else pass the last job result. Either from the function arg if it's set, or manually fetch it from the previous job
// having last_job_result empty can happen either when the job was suspended and is being restarted, or if it's a
// flow restart from a specific step
if last_job_result.is_some() {
last_job_result.unwrap()
if let Some(last_job_result) = last_job_result {
last_job_result
} else {
match get_previous_job_result(db, flow_job.workspace_id.as_str(), &status).await? {
None => Arc::new(to_raw_value(&json!("{}"))),
@@ -2260,17 +2251,18 @@ async fn push_next_flow_job<R: rsmq_async::RsmqConnection + Send + Sync + Clone>
);
Ok(Marc::new(hm))
} else if let Some(id) = get_args_from_id {
let row = sqlx::query_as::<_, RawArgs>(
"SELECT args FROM completed_job WHERE id = $1 AND workspace_id = $2",
)
.bind(id)
.bind(&flow_job.workspace_id)
.fetch_optional(db)
.await?;
let row = fetch_optional_with_fallback!(
db,
query_as,
RowArgs,
"SELECT args FROM {} WHERE id = $1 AND workspace_id = $2",
"job_args" || "completed_job",
id,
&flow_job.workspace_id
)?;
if let Some(raw_args) = row {
Ok(Marc::new(
raw_args.args.map(|x| x.0).unwrap_or_else(HashMap::new),
))
Ok(Marc::new(raw_args.args.map(|x| x.0).unwrap_or_default()))
} else {
Ok(Marc::new(HashMap::new()))
}
@@ -2389,7 +2381,7 @@ async fn push_next_flow_job<R: rsmq_async::RsmqConnection + Send + Sync + Clone>
let mut tx: QueueTransaction<'_, R> = (rsmq.clone(), db.begin().await?).into();
let nargs = args.as_ref();
for i in (0..len).into_iter() {
for i in 0..len {
if i % 100 == 0 && i != 0 {
tracing::info!(id = %flow_job.id, root_id = %job_root, "pushed (non-commited yet) first {i} subflows of {len}");
sqlx::query!(
@@ -3902,13 +3894,15 @@ async fn get_previous_job_result(
Ok(Some(retrieve_flow_jobs_results(db, w_id, flow_jobs).await?))
}
Some(FlowStatusModule::Success { job, .. }) => Ok(Some(
sqlx::query_scalar::<_, Json<Box<RawValue>>>(
"SELECT result FROM completed_job WHERE id = $1 AND workspace_id = $2",
)
.bind(job)
.bind(w_id)
.fetch_one(db)
.await?
fetch_one_with_fallback!(
db,
query_scalar,
Json<Box<RawValue>>,
"SELECT result FROM {} WHERE id = $1 AND workspace_id = $2",
"completed_jobs_result" || "completed_job",
job,
w_id
)?
.0,
)),
_ => Ok(None),

3
frontend/.gitignore vendored
View File

@@ -9,4 +9,5 @@ tests-out/
storageState.json
.env.production
dist/
static/tsdocs/
static/tsdocs/
!build/.gitkeep