Compare commits
24 Commits
rf/bundle1
...
migration/
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fd25c4bf72 | ||
|
|
b031e2f48c | ||
|
|
c95b59d5da | ||
|
|
c1b779ebb5 | ||
|
|
7b31a900f6 | ||
|
|
2f9cc1498d | ||
|
|
6baa549cdb | ||
|
|
4aa269f71f | ||
|
|
7e5956e5a0 | ||
|
|
7a6d404a11 | ||
|
|
fd5d89b56a | ||
|
|
2ad08d9194 | ||
|
|
32428ed5f3 | ||
|
|
b20d83e58e | ||
|
|
6300bb6e71 | ||
|
|
f6d9cd062d | ||
|
|
3e34bc7956 | ||
|
|
045a52cded | ||
|
|
f4df3affa3 | ||
|
|
698375cb17 | ||
|
|
c7a327e1d8 | ||
|
|
7d605e88b0 | ||
|
|
1ce19166ed | ||
|
|
a0542dc853 |
1
backend/Cargo.lock
generated
1
backend/Cargo.lock
generated
@@ -10834,6 +10834,7 @@ dependencies = [
|
||||
"bigdecimal",
|
||||
"chrono",
|
||||
"chrono-tz 0.10.0",
|
||||
"const_format",
|
||||
"cron",
|
||||
"futures-core",
|
||||
"hex",
|
||||
|
||||
@@ -0,0 +1,3 @@
|
||||
DROP TABLE job_params;
|
||||
DROP TABLE job_args;
|
||||
DROP TABLE completed_jobs_result;
|
||||
@@ -0,0 +1,25 @@
|
||||
-- Add up migration script here
|
||||
-- Add down migration script here
|
||||
-- Add up migration script here
|
||||
CREATE TABLE job_params (
|
||||
id UUID PRIMARY KEY,
|
||||
raw_code TEXT,
|
||||
raw_flow jsonb NULL,
|
||||
tag VARCHAR(50),
|
||||
workspace_id VARCHAR(50)
|
||||
);
|
||||
|
||||
-- Add up migration script here
|
||||
CREATE TABLE job_args (
|
||||
id UUID PRIMARY KEY,
|
||||
args JSONB,
|
||||
tag VARCHAR(50),
|
||||
workspace_id VARCHAR(50)
|
||||
);
|
||||
|
||||
CREATE TABLE completed_jobs_result (
|
||||
id UUID PRIMARY KEY,
|
||||
result JSONB,
|
||||
tag VARCHAR(50),
|
||||
workspace_id VARCHAR(50)
|
||||
);
|
||||
@@ -19,6 +19,7 @@ use tokio::{
|
||||
sync::{mpsc, RwLock},
|
||||
};
|
||||
|
||||
use uuid::Uuid;
|
||||
#[cfg(feature = "embedding")]
|
||||
use windmill_api::embeddings::update_embeddings_db;
|
||||
use windmill_api::{
|
||||
@@ -32,7 +33,7 @@ use windmill_common::{
|
||||
auth::JWT_SECRET,
|
||||
ee::CriticalErrorChannel,
|
||||
error,
|
||||
flow_status::FlowStatusModule,
|
||||
flow_status::{FlowStatusModule, ParsedFlowStatusGetter as _},
|
||||
global_settings::{
|
||||
BASE_URL_SETTING, BUNFIG_INSTALL_SCOPES_SETTING, CRITICAL_ERROR_CHANNELS_SETTING,
|
||||
DEFAULT_TAGS_PER_WORKSPACE_SETTING, DEFAULT_TAGS_WORKSPACES_SETTING,
|
||||
@@ -1420,7 +1421,7 @@ async fn handle_zombie_flows(
|
||||
}
|
||||
);
|
||||
report_critical_error(reason.clone(), db.clone()).await;
|
||||
cancel_zombie_flow_job(db, flow, &rsmq, reason).await?;
|
||||
cancel_zombie_flow_job(db, &flow.id, &flow.workspace_id, &rsmq, reason).await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1436,11 +1437,17 @@ async fn handle_zombie_flows(
|
||||
.fetch_all(db)
|
||||
.await?;
|
||||
|
||||
#[derive(sqlx::FromRow, Debug)]
|
||||
struct InQueueJobResult {
|
||||
id: uuid::Uuid,
|
||||
workspace_id: String,
|
||||
}
|
||||
|
||||
for flow in flows2 {
|
||||
let in_queue = sqlx::query_as::<_, QueuedJob>(
|
||||
"SELECT * FROM queue WHERE id = $1 AND running = true AND canceled = false",
|
||||
let in_queue = sqlx::query_as!(InQueueJobResult,
|
||||
"SELECT id, workspace_id FROM queue WHERE id = $1 AND running = true AND canceled = false",
|
||||
flow.parent_flow_id
|
||||
)
|
||||
.bind(flow.parent_flow_id)
|
||||
.fetch_optional(db)
|
||||
.await?;
|
||||
if let Some(job) = in_queue {
|
||||
@@ -1450,7 +1457,7 @@ async fn handle_zombie_flows(
|
||||
job.workspace_id,
|
||||
flow.last_ping
|
||||
);
|
||||
cancel_zombie_flow_job(db, job, &rsmq,
|
||||
cancel_zombie_flow_job(db, &job.id, &job.workspace_id, &rsmq,
|
||||
format!("Flow {} cancelled as one of the parallel branch {} was unable to make the last transition ", flow.parent_flow_id, flow.job_id))
|
||||
.await?;
|
||||
} else {
|
||||
@@ -1462,21 +1469,22 @@ async fn handle_zombie_flows(
|
||||
|
||||
async fn cancel_zombie_flow_job(
|
||||
db: &Pool<Postgres>,
|
||||
flow: QueuedJob,
|
||||
job_id: &Uuid,
|
||||
workspace_id: &str,
|
||||
rsmq: &Option<MultiplexedRsmq>,
|
||||
message: String,
|
||||
) -> Result<(), error::Error> {
|
||||
let tx = db.begin().await.unwrap();
|
||||
tracing::error!(
|
||||
"zombie flow detected: {} in workspace {}. Cancelling it.",
|
||||
flow.id,
|
||||
flow.workspace_id
|
||||
job_id,
|
||||
workspace_id
|
||||
);
|
||||
let (ntx, _) = cancel_job(
|
||||
"monitor",
|
||||
Some(message),
|
||||
flow.id,
|
||||
flow.workspace_id.as_str(),
|
||||
*job_id,
|
||||
workspace_id,
|
||||
tx,
|
||||
db,
|
||||
rsmq.clone(),
|
||||
|
||||
@@ -21,11 +21,7 @@ use std::{
|
||||
vec,
|
||||
};
|
||||
use windmill_common::{
|
||||
db::UserDB,
|
||||
error::JsonResult,
|
||||
jobs::JobKind,
|
||||
scripts::to_i64,
|
||||
utils::{not_found_if_none, paginate, Pagination},
|
||||
db::UserDB, error::JsonResult, jobs::JobKind, query_scalar_with_fallback, scripts::to_i64, utils::{not_found_if_none, paginate, Pagination}
|
||||
};
|
||||
pub fn workspaced_service() -> Router {
|
||||
Router::new()
|
||||
@@ -178,6 +174,7 @@ struct GetArgs {
|
||||
input: Option<bool>,
|
||||
allow_large: Option<bool>,
|
||||
}
|
||||
|
||||
async fn get_args_from_history_or_saved_input(
|
||||
authed: ApiAuthed,
|
||||
Extension(user_db): Extension<UserDB>,
|
||||
@@ -185,6 +182,7 @@ async fn get_args_from_history_or_saved_input(
|
||||
Path((w_id, job_or_input_id)): Path<(String, Uuid)>,
|
||||
) -> JsonResult<Option<Value>> {
|
||||
let mut tx = user_db.begin(&authed).await?;
|
||||
|
||||
let result_o = if let Some(input) = g.input {
|
||||
if input {
|
||||
sqlx::query_scalar!(
|
||||
@@ -196,24 +194,20 @@ async fn get_args_from_history_or_saved_input(
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?
|
||||
} else {
|
||||
sqlx::query_scalar!(
|
||||
query_scalar_with_fallback!(tx,
|
||||
"SELECT CASE WHEN pg_column_size(args) < 40000 OR $3 THEN args ELSE '\"WINDMILL_TOO_BIG\"'::jsonb END as args FROM job_args WHERE id = $1 AND workspace_id = $2",
|
||||
"SELECT CASE WHEN pg_column_size(args) < 40000 OR $3 THEN args ELSE '\"WINDMILL_TOO_BIG\"'::jsonb END as args FROM completed_job WHERE id = $1 AND workspace_id = $2",
|
||||
job_or_input_id,
|
||||
job_or_input_id,
|
||||
w_id,
|
||||
g.allow_large.unwrap_or(true)
|
||||
)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?
|
||||
g.allow_large.unwrap_or(true))?
|
||||
}
|
||||
} else {
|
||||
sqlx::query_scalar!(
|
||||
"SELECT CASE WHEN pg_column_size(args) < 40000 OR $3 THEN args ELSE '\"WINDMILL_TOO_BIG\"'::jsonb END as args FROM completed_job WHERE id = $1 AND workspace_id = $2 UNION ALL SELECT CASE WHEN pg_column_size(args) < 40000 OR $3 THEN args ELSE '\"WINDMILL_TOO_BIG\"'::jsonb END as args FROM input WHERE id = $1 AND workspace_id = $2",
|
||||
job_or_input_id,
|
||||
w_id,
|
||||
g.allow_large.unwrap_or(true)
|
||||
)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?
|
||||
query_scalar_with_fallback!(tx,
|
||||
"SELECT CASE WHEN pg_column_size(args) < 40000 OR $3 THEN args ELSE '\"WINDMILL_TOO_BIG\"'::jsonb END as args FROM job_args WHERE id = $1 AND workspace_id = $2 UNION ALL SELECT CASE WHEN pg_column_size(args) < 40000 OR $3 THEN args ELSE '\"WINDMILL_TOO_BIG\"'::jsonb END as args FROM input WHERE id = $1 AND workspace_id = $2",
|
||||
"SELECT CASE WHEN pg_column_size(args) < 40000 OR $3 THEN args ELSE '\"WINDMILL_TOO_BIG\"'::jsonb END as args FROM completed_job WHERE id = $1 AND workspace_id = $2 UNION ALL SELECT CASE WHEN pg_column_size(args) < 40000 OR $3 THEN args ELSE '\"WINDMILL_TOO_BIG\"'::jsonb END as args FROM input WHERE id = $1 AND workspace_id = $2",
|
||||
job_or_input_id,
|
||||
w_id,
|
||||
g.allow_large.unwrap_or(true))?
|
||||
};
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
@@ -314,9 +314,14 @@ async fn get_result_by_id(
|
||||
Path((w_id, flow_id, node_id)): Path<(String, Uuid, String)>,
|
||||
Query(JsonPath { json_path, .. }): Query<JsonPath>,
|
||||
) -> windmill_common::error::JsonResult<Box<JsonRawValue>> {
|
||||
let res =
|
||||
windmill_queue::get_result_by_id(db.clone(), w_id.clone(), flow_id, node_id, json_path)
|
||||
.await?;
|
||||
let res = windmill_queue::get_result_by_id(
|
||||
db.clone(),
|
||||
&w_id,
|
||||
flow_id,
|
||||
&node_id,
|
||||
json_path.as_deref(),
|
||||
)
|
||||
.await?;
|
||||
|
||||
log_job_view(&db, Some(&authed), &w_id, &flow_id).await?;
|
||||
|
||||
@@ -1349,6 +1354,8 @@ async fn cancel_jobs(
|
||||
) -> error::JsonResult<Vec<Uuid>> {
|
||||
let mut uuids = vec![];
|
||||
let mut tx = db.begin().await?;
|
||||
|
||||
let result = serde_json::json!({"error": { "message": format!("Job canceled: cancel all by {username}"), "name": "Canceled", "reason": "cancel all", "canceler": username}});
|
||||
let trivial_jobs = sqlx::query!("INSERT INTO completed_job AS cj
|
||||
( workspace_id
|
||||
, id
|
||||
@@ -1412,10 +1419,19 @@ async fn cancel_jobs(
|
||||
, tag
|
||||
, priority FROM queue
|
||||
WHERE id = any($2) AND running = false AND parent_job IS NULL AND workspace_id = $3 AND schedule_path IS NULL FOR UPDATE SKIP LOCKED
|
||||
ON CONFLICT (id) DO NOTHING RETURNING id", username, &jobs, w_id, serde_json::json!({"error": { "message": format!("Job canceled: cancel all by {username}"), "name": "Canceled", "reason": "cancel all", "canceler": username}}))
|
||||
ON CONFLICT (id) DO NOTHING RETURNING id", username, &jobs, w_id, &result)
|
||||
.fetch_all(&mut *tx)
|
||||
.await?.into_iter().map(|x| x.id).collect::<Vec<Uuid>>();
|
||||
|
||||
sqlx::query!(
|
||||
"INSERT INTO completed_jobs_result(id, result, tag, workspace_id) SELECT id, $1, tag, $2 FROM completed_job WHERE id = any($3) ON CONFLICT (id) DO NOTHING",
|
||||
result,
|
||||
w_id,
|
||||
&trivial_jobs,
|
||||
)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query!(
|
||||
"DELETE FROM queue WHERE id = any($1) AND workspace_id = $2",
|
||||
&trivial_jobs,
|
||||
@@ -1423,6 +1439,7 @@ async fn cancel_jobs(
|
||||
)
|
||||
.execute(&mut *tx)
|
||||
.await?;
|
||||
|
||||
tx.commit().await?;
|
||||
|
||||
// sqlx::query!(
|
||||
@@ -1436,18 +1453,10 @@ async fn cancel_jobs(
|
||||
continue;
|
||||
}
|
||||
let rsmq = rsmq.clone();
|
||||
match tokio::time::timeout(tokio::time::Duration::from_secs(5), async move {
|
||||
if let Ok(result) = tokio::time::timeout(tokio::time::Duration::from_secs(5), async move {
|
||||
let tx = db.begin().await?;
|
||||
let (tx, _) = windmill_queue::cancel_job(
|
||||
username,
|
||||
None,
|
||||
job_id.clone(),
|
||||
w_id,
|
||||
tx,
|
||||
db,
|
||||
rsmq,
|
||||
false,
|
||||
false,
|
||||
username, None, job_id, w_id, tx, db, rsmq, false, false,
|
||||
)
|
||||
.await?;
|
||||
tx.commit().await?;
|
||||
@@ -1455,20 +1464,19 @@ async fn cancel_jobs(
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(result) => match result {
|
||||
match result {
|
||||
Ok(_) => {
|
||||
uuids.push(job_id);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to cancel job {:?}: {:?}", job_id, e);
|
||||
}
|
||||
},
|
||||
Err(_) => {
|
||||
tracing::error!(
|
||||
"Timeout while trying to cancel job {:?} after 5 seconds",
|
||||
job_id
|
||||
);
|
||||
}
|
||||
} else {
|
||||
tracing::error!(
|
||||
"Timeout while trying to cancel job {:?} after 5 seconds",
|
||||
job_id
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3283,7 +3291,7 @@ async fn run_wait_result(
|
||||
};
|
||||
|
||||
let fast_poll_duration = *WAIT_RESULT_FAST_POLL_DURATION_SECS as u64 * 1000;
|
||||
let mut accumulated_delay = 0 as u64;
|
||||
let mut accumulated_delay = 0_u64;
|
||||
|
||||
loop {
|
||||
if let Some(node_id_for_empty_return) = node_id_for_empty_return.as_ref() {
|
||||
@@ -3400,6 +3408,16 @@ async fn delete_job_metadata_after_use(db: &DB, job_uuid: Uuid) -> Result<(), Er
|
||||
)
|
||||
.execute(db)
|
||||
.await?;
|
||||
|
||||
sqlx::query!(
|
||||
"UPDATE job_args
|
||||
SET args = '{}'::jsonb
|
||||
WHERE id = $1",
|
||||
job_uuid,
|
||||
)
|
||||
.execute(db)
|
||||
.await?;
|
||||
|
||||
sqlx::query!(
|
||||
"UPDATE job_logs
|
||||
SET logs = '##DELETED##'
|
||||
@@ -4384,11 +4402,8 @@ async fn add_batch_jobs(
|
||||
tx = PushIsolationLevel::Transaction(ntx);
|
||||
uuids.push(uuid);
|
||||
}
|
||||
match tx {
|
||||
PushIsolationLevel::Transaction(tx) => {
|
||||
tx.commit().await?;
|
||||
}
|
||||
_ => (),
|
||||
if let PushIsolationLevel::Transaction(tx) = tx {
|
||||
tx.commit().await?;
|
||||
}
|
||||
return Ok(Json(uuids));
|
||||
}
|
||||
|
||||
@@ -31,7 +31,6 @@ use windmill_audit::ActionKind;
|
||||
use windmill_common::{
|
||||
db::UserDB,
|
||||
error::{Error, JsonResult, Result},
|
||||
jobs::QueuedJob,
|
||||
utils::{not_found_if_none, paginate, require_admin, Pagination, StripPath},
|
||||
variables,
|
||||
};
|
||||
@@ -536,11 +535,29 @@ pub async fn transform_json_value<'c>(
|
||||
}
|
||||
Value::String(y) if y.starts_with("$") && job_id.is_some() => {
|
||||
let mut tx = authed_transaction_or_default(authed, user_db.clone(), db).await?;
|
||||
let job = sqlx::query_as::<_, QueuedJob>(
|
||||
"SELECT * FROM queue WHERE id = $1 AND workspace_id = $2",
|
||||
|
||||
#[derive(sqlx::FromRow, Debug)]
|
||||
struct QueuedJobLite {
|
||||
pub id: Uuid,
|
||||
pub workspace_id: String,
|
||||
pub parent_job: Option<Uuid>,
|
||||
pub created_by: String,
|
||||
pub email: String,
|
||||
pub permissioned_as: String,
|
||||
pub script_path: Option<String>,
|
||||
pub schedule_path: Option<String>,
|
||||
pub root_job: Option<Uuid>,
|
||||
pub flow_step_id: Option<String>,
|
||||
pub scheduled_for: chrono::DateTime<chrono::Utc>,
|
||||
}
|
||||
|
||||
let job = sqlx::query_as!(
|
||||
QueuedJobLite,
|
||||
"SELECT id, workspace_id,parent_job, created_by, email, permissioned_as, script_path, schedule_path, root_job, flow_step_id, scheduled_for
|
||||
FROM queue WHERE id = $1 AND workspace_id = $2",
|
||||
job_id.unwrap(),
|
||||
workspace
|
||||
)
|
||||
.bind(job_id.unwrap())
|
||||
.bind(workspace)
|
||||
.fetch_optional(&mut *tx)
|
||||
.await?;
|
||||
tx.commit().await?;
|
||||
|
||||
@@ -45,6 +45,33 @@ pub struct FlowStatus {
|
||||
pub restarted_from: Option<RestartedFrom>,
|
||||
}
|
||||
|
||||
pub trait FlowStatusGetter {
|
||||
fn get_raw_flow_status(&self) -> Option<&sqlx::types::Json<Box<serde_json::value::RawValue>>>;
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! impl_flow_status_getter {
|
||||
($struct_name:ident) => {
|
||||
impl FlowStatusGetter for $struct_name {
|
||||
fn get_raw_flow_status(
|
||||
&self,
|
||||
) -> Option<&sqlx::types::Json<Box<serde_json::value::RawValue>>> {
|
||||
self.flow_status.as_ref()
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
pub trait ParsedFlowStatusGetter {
|
||||
fn parse_flow_status(&self) -> Option<FlowStatus>;
|
||||
}
|
||||
impl<I: FlowStatusGetter> ParsedFlowStatusGetter for I {
|
||||
fn parse_flow_status(&self) -> Option<FlowStatus> {
|
||||
self.get_raw_flow_status()
|
||||
.and_then(|v| serde_json::from_str::<FlowStatus>((**v).get()).ok())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
|
||||
#[serde(default)]
|
||||
pub struct RetryStatus {
|
||||
|
||||
@@ -125,6 +125,34 @@ pub struct FlowValue {
|
||||
pub concurrency_key: Option<String>,
|
||||
}
|
||||
|
||||
pub trait FlowValueGetter {
|
||||
fn get_raw_flow_value(&self) -> Option<&sqlx::types::Json<Box<serde_json::value::RawValue>>>;
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! impl_flow_value_getter {
|
||||
($struct_name:ident) => {
|
||||
impl FlowValueGetter for $struct_name {
|
||||
fn get_raw_flow_value(
|
||||
&self,
|
||||
) -> Option<&sqlx::types::Json<Box<serde_json::value::RawValue>>> {
|
||||
self.raw_flow.as_ref()
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
pub trait ParsedFlowValueGetter {
|
||||
fn parse_raw_flow(&self) -> Option<FlowValue>;
|
||||
}
|
||||
|
||||
impl<I: FlowValueGetter> ParsedFlowValueGetter for I {
|
||||
fn parse_raw_flow(&self) -> Option<FlowValue> {
|
||||
self.get_raw_flow_value()
|
||||
.and_then(|v| serde_json::from_str::<FlowValue>((**v).get()).ok())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Debug, Clone)]
|
||||
pub struct StopAfterIf {
|
||||
pub expr: String,
|
||||
|
||||
@@ -15,9 +15,9 @@ pub const PREPROCESSOR_FAKE_ENTRYPOINT: &str = "__WM_PREPROCESSOR";
|
||||
|
||||
use crate::{
|
||||
error::{self, to_anyhow, Error},
|
||||
flow_status::{FlowStatus, RestartedFrom},
|
||||
flows::{FlowValue, Retry},
|
||||
get_latest_deployed_hash_for_path,
|
||||
flow_status::{FlowStatusGetter, RestartedFrom},
|
||||
flows::{FlowValue, FlowValueGetter, Retry},
|
||||
get_latest_deployed_hash_for_path, impl_flow_status_getter, impl_flow_value_getter,
|
||||
scripts::{ScriptHash, ScriptLang},
|
||||
worker::{to_raw_value, TMP_DIR},
|
||||
};
|
||||
@@ -119,10 +119,7 @@ pub struct QueuedJob {
|
||||
|
||||
impl QueuedJob {
|
||||
pub fn script_path(&self) -> &str {
|
||||
self.script_path
|
||||
.as_ref()
|
||||
.map(String::as_str)
|
||||
.unwrap_or("tmp/main")
|
||||
self.script_path.as_deref().unwrap_or("tmp/main")
|
||||
}
|
||||
pub fn is_flow(&self) -> bool {
|
||||
matches!(
|
||||
@@ -139,22 +136,11 @@ impl QueuedJob {
|
||||
self.script_path()
|
||||
)
|
||||
}
|
||||
|
||||
pub fn parse_raw_flow(&self) -> Option<FlowValue> {
|
||||
self.raw_flow.as_ref().and_then(|v| {
|
||||
let str = (**v).get();
|
||||
// tracing::error!("raw_flow: {}", str);
|
||||
return serde_json::from_str::<FlowValue>(str).ok();
|
||||
})
|
||||
}
|
||||
|
||||
pub fn parse_flow_status(&self) -> Option<FlowStatus> {
|
||||
self.flow_status
|
||||
.as_ref()
|
||||
.and_then(|v| serde_json::from_str::<FlowStatus>((**v).get()).ok())
|
||||
}
|
||||
}
|
||||
|
||||
impl_flow_status_getter!(QueuedJob);
|
||||
impl_flow_value_getter!(QueuedJob);
|
||||
|
||||
impl Default for QueuedJob {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
@@ -266,23 +252,13 @@ impl CompletedJob {
|
||||
pub fn json_result(&self) -> Option<serde_json::Value> {
|
||||
self.result
|
||||
.as_ref()
|
||||
.map(|r| serde_json::from_str(r.get()).ok())
|
||||
.flatten()
|
||||
}
|
||||
|
||||
pub fn parse_raw_flow(&self) -> Option<FlowValue> {
|
||||
self.raw_flow
|
||||
.as_ref()
|
||||
.and_then(|v| serde_json::from_str::<FlowValue>((**v).get()).ok())
|
||||
}
|
||||
|
||||
pub fn parse_flow_status(&self) -> Option<FlowStatus> {
|
||||
self.flow_status
|
||||
.as_ref()
|
||||
.and_then(|v| serde_json::from_str::<FlowStatus>((**v).get()).ok())
|
||||
.and_then(|r| serde_json::from_str(r.get()).ok())
|
||||
}
|
||||
}
|
||||
|
||||
impl_flow_status_getter!(CompletedJob);
|
||||
impl_flow_value_getter!(CompletedJob);
|
||||
|
||||
#[derive(sqlx::FromRow)]
|
||||
pub struct BranchResults {
|
||||
pub result: sqlx::types::Json<Box<RawValue>>,
|
||||
|
||||
@@ -6,6 +6,7 @@
|
||||
* LICENSE-AGPL for a copy of the license.
|
||||
*/
|
||||
|
||||
pub mod macros;
|
||||
use std::{
|
||||
net::SocketAddr,
|
||||
sync::{atomic::AtomicBool, Arc},
|
||||
|
||||
57
backend/windmill-common/src/macros.rs
Normal file
57
backend/windmill-common/src/macros.rs
Normal file
@@ -0,0 +1,57 @@
|
||||
#[macro_export]
|
||||
macro_rules! fetch_one_with_fallback {
|
||||
($db:expr, $query_method:ident, $row_type:ty, $query:literal, $table:literal || $fallback_table:literal, $( $param:expr ),* ) => {{
|
||||
let primary_query = sqlx::$query_method::<_, $row_type>(const_format::formatcp!($query, $table))
|
||||
$(.bind($param))*
|
||||
.fetch_one($db)
|
||||
.await;
|
||||
|
||||
if let Err(sqlx::Error::RowNotFound) = primary_query {
|
||||
tracing::info!("Data not found in job_params, falling back to fetching from $fallback_table");
|
||||
sqlx::$query_method::<_, $row_type>(const_format::formatcp!($query, $fallback_table))
|
||||
$(.bind($param))*
|
||||
.fetch_one($db)
|
||||
.await
|
||||
} else {
|
||||
primary_query
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! fetch_optional_with_fallback {
|
||||
($db:expr, $query_method:ident, $row_type:ty, $query:literal, $table:literal || $fallback_table:literal, $( $param:expr ),* ) => {{
|
||||
let primary_query = sqlx::$query_method::<_, $row_type>(const_format::formatcp!($query, $table))
|
||||
$(.bind($param))*
|
||||
.fetch_optional($db)
|
||||
.await;
|
||||
|
||||
if let Ok(None) = primary_query {
|
||||
tracing::info!("Data not found in job_params, falling back to fetching from $fallback_table");
|
||||
sqlx::$query_method::<_, $row_type>(const_format::formatcp!($query, $fallback_table))
|
||||
$(.bind($param))*
|
||||
.fetch_optional($db)
|
||||
.await
|
||||
} else {
|
||||
primary_query
|
||||
}
|
||||
}};
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! query_scalar_with_fallback {
|
||||
($tx:expr, $query:literal, $fallback_query:literal,$( $param:expr ),* ) => {{
|
||||
let primary_query = sqlx::query_scalar!($query,$($param),*)
|
||||
.fetch_optional(&mut *$tx)
|
||||
.await;
|
||||
|
||||
if let Ok(None) = primary_query {
|
||||
tracing::info!("Data not found in job_args, falling back to fetching from queue");
|
||||
sqlx::query_scalar!($fallback_query, $($param),*)
|
||||
.fetch_optional(&mut *$tx)
|
||||
.await
|
||||
} else {
|
||||
primary_query
|
||||
}
|
||||
}};
|
||||
}
|
||||
@@ -91,7 +91,7 @@ lazy_static::lazy_static! {
|
||||
}
|
||||
|
||||
pub async fn make_suspended_pull_query(wc: &WorkerConfig) {
|
||||
if wc.worker_tags.len() == 0 {
|
||||
if wc.worker_tags.is_empty() {
|
||||
tracing::error!("Empty tags in worker tags, skipping");
|
||||
return;
|
||||
}
|
||||
@@ -123,7 +123,7 @@ pub async fn make_suspended_pull_query(wc: &WorkerConfig) {
|
||||
pub async fn make_pull_query(wc: &WorkerConfig) {
|
||||
let mut queries = vec![];
|
||||
for tags in wc.priority_tags_sorted.iter() {
|
||||
if tags.tags.len() == 0 {
|
||||
if tags.tags.is_empty() {
|
||||
tracing::error!("Empty tags in priority tags, skipping");
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -42,4 +42,5 @@ async-recursion.workspace = true
|
||||
bigdecimal.workspace = true
|
||||
axum.workspace = true
|
||||
serde_urlencoded.workspace = true
|
||||
regex.workspace = true
|
||||
regex.workspace = true
|
||||
const_format.workspace = true
|
||||
|
||||
@@ -6,8 +6,6 @@
|
||||
* LICENSE-AGPL for a copy of the license.
|
||||
*/
|
||||
|
||||
use std::{borrow::Borrow, collections::HashMap, sync::Arc, vec};
|
||||
|
||||
use anyhow::Context;
|
||||
use async_recursion::async_recursion;
|
||||
use axum::{
|
||||
@@ -31,12 +29,14 @@ use serde_json::{json, value::RawValue};
|
||||
use sqlx::{types::Json, FromRow, Pool, Postgres, Transaction};
|
||||
#[cfg(feature = "benchmark")]
|
||||
use std::time::Instant;
|
||||
use std::{borrow::Borrow, collections::HashMap, sync::Arc, vec};
|
||||
use tokio::{sync::RwLock, time::sleep};
|
||||
use tracing::{instrument, Instrument};
|
||||
use ulid::Ulid;
|
||||
use uuid::Uuid;
|
||||
use windmill_audit::audit_ee::{audit_log, AuditAuthor};
|
||||
use windmill_audit::ActionKind;
|
||||
use windmill_common::{fetch_optional_with_fallback, flows::FlowValueGetter};
|
||||
|
||||
use windmill_common::{
|
||||
add_time,
|
||||
@@ -44,12 +44,15 @@ use windmill_common::{
|
||||
db::{Authed, UserDB},
|
||||
error::{self, to_anyhow, Error},
|
||||
flow_status::{
|
||||
BranchAllStatus, FlowCleanupModule, FlowStatus, FlowStatusModule, FlowStatusModuleWParent,
|
||||
Iterator, JobResult, RestartedFrom, RetryStatus, MAX_RETRY_ATTEMPTS, MAX_RETRY_INTERVAL,
|
||||
BranchAllStatus, FlowCleanupModule, FlowStatus, FlowStatusGetter, FlowStatusModule,
|
||||
FlowStatusModuleWParent, Iterator, JobResult, ParsedFlowStatusGetter, RestartedFrom,
|
||||
RetryStatus, MAX_RETRY_ATTEMPTS, MAX_RETRY_INTERVAL,
|
||||
},
|
||||
flows::{
|
||||
add_virtual_items_if_necessary, FlowModule, FlowModuleValue, FlowValue, InputTransform,
|
||||
ParsedFlowValueGetter,
|
||||
},
|
||||
impl_flow_status_getter, impl_flow_value_getter,
|
||||
jobs::{
|
||||
get_payload_tag_from_prefixed_path, CompletedJob, JobKind, JobPayload, QueuedJob, RawCode,
|
||||
ENTRYPOINT_OVERRIDE, PREPROCESSOR_FAKE_ENTRYPOINT,
|
||||
@@ -142,6 +145,14 @@ pub struct CanceledBy {
|
||||
pub reason: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, sqlx::FromRow)]
|
||||
pub struct CompletedSubFlow {
|
||||
pub id: Uuid,
|
||||
pub flow_status: Option<sqlx::types::Json<Box<RawValue>>>,
|
||||
}
|
||||
|
||||
impl_flow_status_getter!(CompletedSubFlow);
|
||||
|
||||
pub async fn cancel_single_job<'c>(
|
||||
username: &str,
|
||||
reason: Option<String>,
|
||||
@@ -222,26 +233,23 @@ pub async fn cancel_job<'c>(
|
||||
force_cancel: bool,
|
||||
require_anonymous: bool,
|
||||
) -> error::Result<(Transaction<'c, Postgres>, Option<Uuid>)> {
|
||||
let job = get_queued_job_tx(id, &w_id, &mut tx).await?;
|
||||
|
||||
if job.is_none() {
|
||||
let Some(mut job) = get_queued_job_tx(id, w_id, &mut tx).await? else {
|
||||
return Ok((tx, None));
|
||||
}
|
||||
};
|
||||
|
||||
if require_anonymous && job.as_ref().unwrap().created_by != "anonymous" {
|
||||
if require_anonymous && job.created_by != "anonymous" {
|
||||
return Err(Error::BadRequest(
|
||||
"You are not logged in and this job was not created by an anonymous user like you so you cannot cancel it".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let mut job = job.unwrap();
|
||||
if force_cancel {
|
||||
// if force canceling a flow step, make sure we force cancel from the highest parent
|
||||
loop {
|
||||
if job.parent_job.is_none() {
|
||||
break;
|
||||
}
|
||||
match get_queued_job_tx(job.parent_job.unwrap(), &w_id, &mut tx).await? {
|
||||
match get_queued_job_tx(job.parent_job.unwrap(), w_id, &mut tx).await? {
|
||||
Some(j) => {
|
||||
job = j;
|
||||
}
|
||||
@@ -587,6 +595,8 @@ pub async fn add_completed_job<
|
||||
|
||||
let mem_peak = mem_peak.max(queued_job.mem_peak.unwrap_or(0));
|
||||
add_time!(bench, "add_completed_job query START");
|
||||
|
||||
// On conflict (when id already exists), update the success and result fields.
|
||||
let _duration: i64 = sqlx::query_scalar!(
|
||||
"INSERT INTO completed_job AS cj
|
||||
( workspace_id
|
||||
@@ -623,8 +633,8 @@ pub async fn add_completed_job<
|
||||
VALUES ($1, $2, $3, $4, $5, COALESCE($6, now()), (EXTRACT('epoch' FROM (now())) - EXTRACT('epoch' FROM (COALESCE($6, now()))))*1000, $7, $8, $9,\
|
||||
$10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22, $23, $24, $25, $26, $27, $28, $29)
|
||||
ON CONFLICT (id) DO UPDATE SET success = $7, result = $11 RETURNING duration_ms",
|
||||
queued_job.workspace_id,
|
||||
queued_job.id,
|
||||
&queued_job.workspace_id,
|
||||
&queued_job.id,
|
||||
queued_job.parent_job,
|
||||
queued_job.created_by,
|
||||
queued_job.created_at,
|
||||
@@ -633,7 +643,7 @@ pub async fn add_completed_job<
|
||||
queued_job.script_hash.map(|x| x.0),
|
||||
queued_job.script_path,
|
||||
&queued_job.args as &Option<Json<HashMap<String, Box<RawValue>>>>,
|
||||
result as Json<&T>,
|
||||
&result as &Json<&T>,
|
||||
queued_job.raw_code,
|
||||
queued_job.raw_lock,
|
||||
canceled_by.is_some(),
|
||||
@@ -650,16 +660,32 @@ pub async fn add_completed_job<
|
||||
queued_job.email,
|
||||
queued_job.visible_to_owner,
|
||||
if mem_peak > 0 { Some(mem_peak) } else { None },
|
||||
queued_job.tag,
|
||||
&queued_job.tag,
|
||||
queued_job.priority,
|
||||
)
|
||||
.fetch_one(&mut tx)
|
||||
.await
|
||||
.map_err(|e| Error::InternalErr(format!("Could not add completed job {job_id}: {e:#}")))?;
|
||||
// tracing::error!("2 {:?}", start.elapsed());
|
||||
|
||||
add_time!(bench, "add_completed_job query END");
|
||||
|
||||
add_time!(bench, "completed_jobs_result query START");
|
||||
sqlx::query!(
|
||||
"INSERT INTO completed_jobs_result(id, result, tag, workspace_id) VALUES($1, $2, $3, $4) ON CONFLICT (id) DO UPDATE SET result = $2",
|
||||
queued_job.id,
|
||||
&result as &Json<&T>,
|
||||
queued_job.tag,
|
||||
queued_job.workspace_id,
|
||||
)
|
||||
.execute(&mut tx)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
Error::InternalErr(format!(
|
||||
"Could not add completed job result {job_id}: {e:#}"
|
||||
))
|
||||
})?;
|
||||
|
||||
add_time!(bench, "completed_jobs_result query END");
|
||||
|
||||
if !queued_job.is_flow_step {
|
||||
if _duration > 500
|
||||
&& (queued_job.job_kind == JobKind::Script || queued_job.job_kind == JobKind::Preview)
|
||||
@@ -864,10 +890,10 @@ pub async fn add_completed_job<
|
||||
tx.commit().await?;
|
||||
tracing::info!(
|
||||
%job_id,
|
||||
root_job = ?queued_job.root_job.map(|x| x.to_string()).unwrap_or_else(|| String::new()),
|
||||
root_job = ?queued_job.root_job.map(|x| x.to_string()).unwrap_or_default(),
|
||||
path = &queued_job.script_path(),
|
||||
job_kind = ?queued_job.job_kind,
|
||||
started_at = ?queued_job.started_at.map(|x| x.to_string()).unwrap_or_else(|| String::new()),
|
||||
started_at = ?queued_job.started_at.map(|x| x.to_string()).unwrap_or_default(),
|
||||
duration = ?_duration,
|
||||
permissioned_as = ?queued_job.permissioned_as,
|
||||
email = ?queued_job.email,
|
||||
@@ -2267,55 +2293,48 @@ pub struct ResultWithId {
|
||||
|
||||
pub async fn get_result_by_id(
|
||||
db: Pool<Postgres>,
|
||||
w_id: String,
|
||||
w_id: &str,
|
||||
flow_id: Uuid,
|
||||
node_id: String,
|
||||
json_path: Option<String>,
|
||||
node_id: &str,
|
||||
json_path: Option<&str>,
|
||||
) -> error::Result<Box<RawValue>> {
|
||||
match get_result_by_id_from_running_flow(
|
||||
&db,
|
||||
w_id.as_str(),
|
||||
&flow_id,
|
||||
node_id.as_str(),
|
||||
json_path.clone(),
|
||||
)
|
||||
.await
|
||||
{
|
||||
#[derive(sqlx::FromRow, Debug)]
|
||||
struct RunningFlowJobResult {
|
||||
pub id: Uuid,
|
||||
pub flow_status: Option<Json<Box<RawValue>>>,
|
||||
}
|
||||
impl_flow_status_getter!(RunningFlowJobResult);
|
||||
|
||||
match get_result_by_id_from_running_flow(&db, w_id, &flow_id, node_id, json_path).await {
|
||||
Ok(res) => Ok(res),
|
||||
Err(_) => {
|
||||
let running_flow_job =sqlx::query_as::<_, QueuedJob>(
|
||||
"SELECT * FROM queue WHERE COALESCE((SELECT root_job FROM queue WHERE id = $1), $1) = id AND workspace_id = $2"
|
||||
let running_flow_job =sqlx::query_as::<_, RunningFlowJobResult>(
|
||||
"SELECT id, flow_status FROM queue WHERE COALESCE((SELECT root_job FROM queue WHERE id = $1), $1) = id AND workspace_id = $2"
|
||||
).bind(flow_id)
|
||||
.bind(&w_id)
|
||||
.fetch_optional(&db).await?;
|
||||
|
||||
match running_flow_job {
|
||||
Some(job) => {
|
||||
let restarted_from = windmill_common::utils::not_found_if_none(
|
||||
job.parse_flow_status()
|
||||
.map(|status| status.restarted_from)
|
||||
.flatten(),
|
||||
.and_then(|status| status.restarted_from),
|
||||
"Id not found in the result's mapping of the root job and root job had no restarted from information",
|
||||
format!("parent: {}, root: {}, id: {}", flow_id, job.id, node_id),
|
||||
)?;
|
||||
|
||||
get_result_by_id_from_original_flow(
|
||||
&db,
|
||||
w_id.as_str(),
|
||||
w_id,
|
||||
&restarted_from.flow_job_id,
|
||||
node_id.as_str(),
|
||||
json_path.clone(),
|
||||
node_id,
|
||||
json_path,
|
||||
)
|
||||
.await
|
||||
}
|
||||
None => {
|
||||
get_result_by_id_from_original_flow(
|
||||
&db,
|
||||
w_id.as_str(),
|
||||
&flow_id,
|
||||
node_id.as_str(),
|
||||
json_path.clone(),
|
||||
)
|
||||
.await
|
||||
get_result_by_id_from_original_flow(&db, w_id, &flow_id, node_id, json_path)
|
||||
.await
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2334,7 +2353,7 @@ pub async fn get_result_by_id_from_running_flow(
|
||||
w_id: &str,
|
||||
flow_id: &Uuid,
|
||||
node_id: &str,
|
||||
json_path: Option<String>,
|
||||
json_path: Option<&str>,
|
||||
) -> error::Result<Box<RawValue>> {
|
||||
let flow_job_result = sqlx::query_as::<_, FlowJobResult>(
|
||||
"SELECT leaf_jobs->$1::text as leaf_jobs, parent_job FROM queue WHERE COALESCE((SELECT root_job FROM queue WHERE id = $2), $2) = id AND workspace_id = $3")
|
||||
@@ -2352,8 +2371,7 @@ pub async fn get_result_by_id_from_running_flow(
|
||||
|
||||
let job_result = flow_job_result
|
||||
.leaf_jobs
|
||||
.map(|x| serde_json::from_str(x.get()).ok())
|
||||
.flatten();
|
||||
.and_then(|x| serde_json::from_str(x.get()).ok());
|
||||
|
||||
if job_result.is_none() && flow_job_result.parent_job.is_some() {
|
||||
let parent_job = flow_job_result.parent_job.unwrap();
|
||||
@@ -2378,9 +2396,9 @@ pub async fn get_result_by_id_from_running_flow(
|
||||
async fn get_completed_flow_node_result_rec(
|
||||
db: &Pool<Postgres>,
|
||||
w_id: &str,
|
||||
subflows: Vec<CompletedJob>,
|
||||
subflows: &[CompletedSubFlow],
|
||||
node_id: &str,
|
||||
json_path: Option<String>,
|
||||
json_path: Option<&str>,
|
||||
) -> error::Result<Option<Box<RawValue>>> {
|
||||
for subflow in subflows {
|
||||
let flow_status = subflow.parse_flow_status().ok_or_else(|| {
|
||||
@@ -2397,7 +2415,7 @@ async fn get_completed_flow_node_result_rec(
|
||||
db,
|
||||
w_id,
|
||||
JobResult::SingleJob(leaf_job_uuid),
|
||||
json_path.clone(),
|
||||
json_path,
|
||||
)
|
||||
.await
|
||||
.map(Some),
|
||||
@@ -2405,7 +2423,7 @@ async fn get_completed_flow_node_result_rec(
|
||||
db,
|
||||
w_id,
|
||||
JobResult::ListJob(jobs),
|
||||
json_path.clone(),
|
||||
json_path,
|
||||
)
|
||||
.await
|
||||
.map(Some),
|
||||
@@ -2416,10 +2434,11 @@ async fn get_completed_flow_node_result_rec(
|
||||
))),
|
||||
};
|
||||
} else {
|
||||
let subflows = sqlx::query_as::<_, CompletedJob>(
|
||||
"SELECT *, null as labels FROM completed_job WHERE parent_job = $1 AND workspace_id = $2 AND flow_status IS NOT NULL",
|
||||
let subflows = sqlx::query_as::<_, CompletedSubFlow>(
|
||||
"SELECT id, flow_status as labels FROM completed_job WHERE parent_job = $1 AND workspace_id = $2 AND flow_status IS NOT NULL",
|
||||
).bind(subflow.id).bind(w_id).fetch_all(db).await?;
|
||||
match get_completed_flow_node_result_rec(db, w_id, subflows, node_id, json_path.clone())
|
||||
|
||||
match get_completed_flow_node_result_rec(db, w_id, &subflows, node_id, json_path)
|
||||
.await?
|
||||
{
|
||||
Some(res) => return Ok(Some(res)),
|
||||
@@ -2436,10 +2455,10 @@ async fn get_result_by_id_from_original_flow(
|
||||
w_id: &str,
|
||||
completed_flow_id: &Uuid,
|
||||
node_id: &str,
|
||||
json_path: Option<String>,
|
||||
json_path: Option<&str>,
|
||||
) -> error::Result<Box<RawValue>> {
|
||||
let flow_job = sqlx::query_as::<_, CompletedJob>(
|
||||
"SELECT *, null as labels FROM completed_job WHERE id = $1 AND workspace_id = $2",
|
||||
let flow_job = sqlx::query_as::<_, CompletedSubFlow>(
|
||||
"SELECT id, flow_status FROM completed_job WHERE id = $1 AND workspace_id = $2",
|
||||
)
|
||||
.bind(completed_flow_id)
|
||||
.bind(w_id)
|
||||
@@ -2452,7 +2471,7 @@ async fn get_result_by_id_from_original_flow(
|
||||
format!("root: {}, id: {}", completed_flow_id, node_id),
|
||||
)?;
|
||||
|
||||
match get_completed_flow_node_result_rec(db, w_id, vec![flow_job], node_id, json_path).await? {
|
||||
match get_completed_flow_node_result_rec(db, w_id, &[flow_job], node_id, json_path).await? {
|
||||
Some(res) => Ok(res),
|
||||
None => Err(error::Error::NotFound(format!(
|
||||
"Flow result by id not found going top-down from {}, (id: {})",
|
||||
@@ -2465,31 +2484,33 @@ async fn extract_result_from_job_result(
|
||||
db: &Pool<Postgres>,
|
||||
w_id: &str,
|
||||
job_result: JobResult,
|
||||
json_path: Option<String>,
|
||||
json_path: Option<&str>,
|
||||
) -> error::Result<Box<RawValue>> {
|
||||
match job_result {
|
||||
JobResult::ListJob(job_ids) => match json_path {
|
||||
Some(json_path) => {
|
||||
let mut parts = json_path.split(".");
|
||||
let mut parts = json_path.split('.');
|
||||
|
||||
let Some(idx) = parts.next().map(|x| x.parse::<usize>().ok()).flatten() else {
|
||||
let Some(ref idx) = parts.next().and_then(|x| x.parse::<usize>().ok()) else {
|
||||
return Ok(to_raw_value(&serde_json::Value::Null));
|
||||
};
|
||||
let Some(job_id) = job_ids.get(idx).cloned() else {
|
||||
let Some(job_id) = job_ids.get(*idx) else {
|
||||
return Ok(to_raw_value(&serde_json::Value::Null));
|
||||
};
|
||||
Ok(sqlx::query_as::<_, ResultR>(
|
||||
"SELECT result #> $3 as result FROM completed_job WHERE id = $1 AND workspace_id = $2",
|
||||
)
|
||||
.bind(job_id)
|
||||
.bind(w_id)
|
||||
.bind(
|
||||
parts.map(|x| x.to_string()).collect::<Vec<_>>()
|
||||
)
|
||||
.fetch_optional(db)
|
||||
.await?
|
||||
.map(|r| r.result.map(|x| x.0))
|
||||
.flatten()
|
||||
|
||||
let parts = parts.map(|x| x.to_string()).collect_vec();
|
||||
|
||||
Ok(fetch_optional_with_fallback!(
|
||||
db,
|
||||
query_as,
|
||||
ResultR,
|
||||
"SELECT result #> $3 as result FROM {} WHERE id = $1 AND workspace_id = $2",
|
||||
"completed_jobs_result" || "completed_job",
|
||||
*job_id,
|
||||
w_id,
|
||||
&parts
|
||||
)?
|
||||
.and_then(|r| r.result.map(|x| x.0))
|
||||
.unwrap_or_else(|| to_raw_value(&serde_json::Value::Null)))
|
||||
}
|
||||
None => {
|
||||
@@ -2503,6 +2524,7 @@ async fn extract_result_from_job_result(
|
||||
.into_iter()
|
||||
.filter_map(|x| x.result.map(|y| (x.id, y)))
|
||||
.collect::<HashMap<Uuid, Json<Box<RawValue>>>>();
|
||||
|
||||
let result = job_ids
|
||||
.into_iter()
|
||||
.map(|id| {
|
||||
@@ -2510,25 +2532,31 @@ async fn extract_result_from_job_result(
|
||||
.map(|x| x.0.clone())
|
||||
.unwrap_or_else(|| to_raw_value(&serde_json::Value::Null))
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
.collect_vec();
|
||||
Ok(to_raw_value(&result))
|
||||
}
|
||||
},
|
||||
JobResult::SingleJob(x) => Ok(sqlx::query_as::<_, ResultR>(
|
||||
"SELECT result #> $3 as result FROM completed_job WHERE id = $1 AND workspace_id = $2",
|
||||
)
|
||||
.bind(x)
|
||||
.bind(w_id)
|
||||
.bind(
|
||||
json_path
|
||||
// ici
|
||||
JobResult::SingleJob(x) => {
|
||||
let path = json_path
|
||||
.map(|x| x.split(".").map(|x| x.to_string()).collect::<Vec<_>>())
|
||||
.unwrap_or_default(),
|
||||
)
|
||||
.fetch_optional(db)
|
||||
.await?
|
||||
.map(|r| r.result.map(|x| x.0))
|
||||
.flatten()
|
||||
.unwrap_or_else(|| to_raw_value(&serde_json::Value::Null))),
|
||||
.unwrap_or_default();
|
||||
|
||||
let res = fetch_optional_with_fallback!(
|
||||
db,
|
||||
query_as,
|
||||
ResultR,
|
||||
"SELECT result #> $3 as result FROM {} WHERE id = $1 AND workspace_id = $2",
|
||||
"completed_jobs_result" || "completed_job",
|
||||
x,
|
||||
w_id,
|
||||
&path
|
||||
)?
|
||||
.and_then(|r| r.result.map(|x| x.0))
|
||||
.unwrap_or_else(|| to_raw_value(&serde_json::Value::Null));
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2643,7 +2671,7 @@ pub struct PushArgsOwned {
|
||||
pub args: HashMap<String, Box<RawValue>>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PushArgs<'c> {
|
||||
pub extra: Option<HashMap<String, Box<RawValue>>>,
|
||||
pub args: &'c HashMap<String, Box<RawValue>>,
|
||||
@@ -2754,7 +2782,7 @@ fn restructure_cloudevents_metadata(
|
||||
.unwrap_or_else(|| to_raw_value(&serde_json::Value::Null));
|
||||
let str = data.to_string();
|
||||
|
||||
let wrap_body = str.len() > 0 && str.chars().next().unwrap() != '{';
|
||||
let wrap_body = !str.is_empty() && !str.starts_with('{');
|
||||
|
||||
if wrap_body {
|
||||
let args = serde_json::from_str::<Option<Box<RawValue>>>(&str)
|
||||
@@ -2784,7 +2812,7 @@ impl PushArgsOwned {
|
||||
extra.insert("raw_string".to_string(), to_raw_value(&str));
|
||||
}
|
||||
|
||||
let wrap_body = force_wrap_body || str.len() > 0 && str.chars().next().unwrap() != '{';
|
||||
let wrap_body = force_wrap_body || !str.is_empty() && !str.starts_with('{');
|
||||
|
||||
if wrap_body {
|
||||
let args = serde_json::from_str::<Option<Box<RawValue>>>(&str)
|
||||
@@ -3895,6 +3923,10 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>(
|
||||
};
|
||||
|
||||
tracing::debug!("Pushing job {job_id} with tag {tag}, schedule_path {schedule_path:?}, script_path: {script_path:?}, email {email}, workspace_id {workspace_id}");
|
||||
|
||||
let raw_flow = raw_flow.map(Json);
|
||||
let args = Json(args);
|
||||
|
||||
let uuid = sqlx::query_scalar!(
|
||||
"INSERT INTO queue
|
||||
(workspace_id, id, running, parent_job, created_by, permissioned_as, scheduled_for,
|
||||
@@ -3913,12 +3945,12 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>(
|
||||
scheduled_for_o,
|
||||
script_hash,
|
||||
script_path.clone(),
|
||||
raw_code,
|
||||
raw_code.clone(),
|
||||
raw_lock,
|
||||
Json(args) as Json<PushArgs>,
|
||||
args.clone() as Json<PushArgs>,
|
||||
job_kind.clone() as JobKind,
|
||||
schedule_path,
|
||||
raw_flow.map(Json) as Option<Json<FlowValue>>,
|
||||
raw_flow.clone() as Option<Json<FlowValue>>,
|
||||
flow_status.map(Json) as Option<Json<FlowStatus>>,
|
||||
is_flow_step,
|
||||
language as Option<ScriptLang>,
|
||||
@@ -3927,7 +3959,7 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>(
|
||||
email,
|
||||
visible_to_owner,
|
||||
root_job,
|
||||
tag,
|
||||
tag.clone(),
|
||||
concurrent_limit,
|
||||
if concurrent_limit.is_some() { concurrency_time_window_s } else { None },
|
||||
custom_timeout,
|
||||
@@ -3939,6 +3971,34 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>(
|
||||
.await
|
||||
.map_err(|e| Error::InternalErr(format!("Could not insert into queue {job_id} with tag {tag}, schedule_path {schedule_path:?}, script_path: {script_path:?}, email {email}, workspace_id {workspace_id}: {e:#}")))?;
|
||||
|
||||
// insert into args queue
|
||||
sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO job_args (id, workspace_id, args, tag)
|
||||
VALUES ($1, $2, $3, $4)
|
||||
"#,
|
||||
uuid,
|
||||
workspace_id,
|
||||
args as Json<PushArgs>,
|
||||
&tag
|
||||
)
|
||||
.execute(&mut tx)
|
||||
.await?;
|
||||
|
||||
sqlx::query!(
|
||||
r#"
|
||||
INSERT INTO job_params (id, workspace_id, raw_code, raw_flow, tag)
|
||||
VALUES ($1, $2, $3, $4, $5)
|
||||
"#,
|
||||
uuid,
|
||||
workspace_id,
|
||||
raw_code,
|
||||
raw_flow as Option<Json<FlowValue>>,
|
||||
tag
|
||||
)
|
||||
.execute(&mut tx)
|
||||
.await?;
|
||||
|
||||
tracing::debug!("Pushed {job_id}");
|
||||
// TODO: technically the job isn't queued yet, as the transaction can be rolled back. Should be solved when moving these metrics to the queue abstraction.
|
||||
#[cfg(feature = "prometheus")]
|
||||
@@ -4059,11 +4119,8 @@ pub async fn push<'c, 'd, R: rsmq_async::RsmqConnection + Send + 'c>(
|
||||
}
|
||||
|
||||
pub fn canceled_job_to_result(job: &QueuedJob) -> serde_json::Value {
|
||||
let reason = job
|
||||
.canceled_reason
|
||||
.as_deref()
|
||||
.unwrap_or_else(|| "no reason given");
|
||||
let canceler = job.canceled_by.as_deref().unwrap_or_else(|| "unknown");
|
||||
let reason = job.canceled_reason.as_deref().unwrap_or("no reason given");
|
||||
let canceler = job.canceled_by.as_deref().unwrap_or("unknown");
|
||||
serde_json::json!({"message": format!("Job canceled: {reason} by {canceler}"), "name": "Canceled", "reason": reason, "canceler": canceler})
|
||||
}
|
||||
|
||||
@@ -4086,8 +4143,19 @@ async fn restarted_flows_resolution(
|
||||
),
|
||||
Error,
|
||||
> {
|
||||
let completed_job = sqlx::query_as::<_, CompletedJob>(
|
||||
"SELECT *, null as labels FROM completed_job WHERE id = $1 and workspace_id = $2",
|
||||
#[derive(Debug, sqlx::FromRow)]
|
||||
struct QueryResults {
|
||||
pub script_path: Option<String>,
|
||||
pub priority: Option<i16>,
|
||||
pub raw_flow: Option<sqlx::types::Json<Box<RawValue>>>,
|
||||
pub flow_status: Option<sqlx::types::Json<Box<RawValue>>>,
|
||||
}
|
||||
|
||||
impl_flow_status_getter!(QueryResults);
|
||||
impl_flow_value_getter!(QueryResults);
|
||||
|
||||
let completed_job = sqlx::query_as::<_, QueryResults>(
|
||||
"SELECT script_path, priority, raw_flow, flow_status FROM completed_job WHERE id = $1 and workspace_id = $2",
|
||||
)
|
||||
.bind(completed_flow_id)
|
||||
.bind(workspace_id)
|
||||
@@ -4120,10 +4188,9 @@ async fn restarted_flows_resolution(
|
||||
if flow_value_if_any
|
||||
.clone()
|
||||
.map(|fv| {
|
||||
fv.modules
|
||||
!fv.modules
|
||||
.iter()
|
||||
.find(|flow_value_module| flow_value_module.id == module.id())
|
||||
.is_none()
|
||||
.any(|flow_value_module| flow_value_module.id == module.id())
|
||||
})
|
||||
.unwrap_or(false)
|
||||
{
|
||||
@@ -4239,7 +4306,7 @@ async fn restarted_flows_resolution(
|
||||
truncated_modules.push(FlowStatusModule::WaitingForPriorSteps { id: module.id() });
|
||||
} else {
|
||||
// else we simply "transfer" the module from the completed flow to the new one if it's a success
|
||||
step_n = step_n + 1;
|
||||
step_n += 1;
|
||||
match module.clone() {
|
||||
FlowStatusModule::Success { .. } => Ok(truncated_modules.push(module)),
|
||||
_ => Err(Error::InternalErr(format!(
|
||||
|
||||
@@ -29,20 +29,20 @@ use sqlx::FromRow;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tracing::instrument;
|
||||
use uuid::Uuid;
|
||||
use windmill_common::add_time;
|
||||
use windmill_common::auth::JobPerms;
|
||||
#[cfg(feature = "benchmark")]
|
||||
use windmill_common::bench::BenchmarkIter;
|
||||
use windmill_common::db::Authed;
|
||||
use windmill_common::flow_status::{
|
||||
ApprovalConditions, FlowStatusModuleWParent, Iterator, JobResult,
|
||||
ApprovalConditions, FlowStatusModuleWParent, Iterator, JobResult, ParsedFlowStatusGetter,
|
||||
};
|
||||
use windmill_common::flows::add_virtual_items_if_necessary;
|
||||
use windmill_common::flows::{add_virtual_items_if_necessary, ParsedFlowValueGetter};
|
||||
use windmill_common::jobs::{
|
||||
script_hash_to_tag_and_limits, script_path_to_payload, BranchResults, JobPayload, QueuedJob,
|
||||
RawCode, ENTRYPOINT_OVERRIDE,
|
||||
};
|
||||
use windmill_common::worker::to_raw_value;
|
||||
use windmill_common::{add_time, fetch_one_with_fallback, fetch_optional_with_fallback};
|
||||
use windmill_common::{
|
||||
error::{self, to_anyhow, Error},
|
||||
flow_status::{
|
||||
@@ -61,6 +61,19 @@ type DB = sqlx::Pool<sqlx::Postgres>;
|
||||
|
||||
use windmill_queue::{canceled_job_to_result, get_queued_job_tx, push, QueueTransaction};
|
||||
|
||||
async fn get_args_from_job_id(db: &DB, id: &Uuid) -> Result<RowArgs, Error> {
|
||||
let args = fetch_one_with_fallback!(
|
||||
db,
|
||||
query_as,
|
||||
RowArgs,
|
||||
"SELECT args FROM {} WHERE id = $1",
|
||||
"job_args" || "queue",
|
||||
id
|
||||
);
|
||||
|
||||
args.map_err(|e| Error::InternalErr(format!("retrieval of args from state: {e:#}")))
|
||||
}
|
||||
|
||||
// #[instrument(level = "trace", skip_all)]
|
||||
pub async fn update_flow_status_after_job_completion<
|
||||
R: rsmq_async::RsmqConnection + Send + Sync + Clone,
|
||||
@@ -229,10 +242,10 @@ pub async fn update_flow_status_after_job_completion_internal<
|
||||
})?;
|
||||
|
||||
let old_status = serde_json::from_str::<FlowStatus>(old_status_json.flow_status.get())
|
||||
.or_else(|e| {
|
||||
Err(Error::InternalErr(format!(
|
||||
.map_err(|e| {
|
||||
Error::InternalErr(format!(
|
||||
"requiring status to be parsable as FlowStatus: {e:?}"
|
||||
)))
|
||||
))
|
||||
})?;
|
||||
|
||||
let current_module = if let Some(x) = old_status_json.current_module {
|
||||
@@ -289,7 +302,7 @@ pub async fn update_flow_status_after_job_completion_internal<
|
||||
|
||||
// 0 length flows are not failure steps
|
||||
let is_failure_step =
|
||||
old_status.step >= old_status.modules.len() as i32 && old_status.modules.len() > 0;
|
||||
old_status.step >= old_status.modules.len() as i32 && !old_status.modules.is_empty();
|
||||
|
||||
let (mut stop_early, mut skip_if_stop_early, continue_on_error) = if let Some(se) =
|
||||
stop_early_override
|
||||
@@ -302,14 +315,13 @@ pub async fn update_flow_status_after_job_completion_internal<
|
||||
};
|
||||
|
||||
let is_flow = if let Some(step) = step {
|
||||
sqlx::query_scalar!(
|
||||
"SELECT raw_flow->'modules'->($1)->'value'->>'type' = 'flow' FROM queue WHERE id = $2",
|
||||
fetch_one_with_fallback!(db,
|
||||
query_scalar,
|
||||
Option<bool>,
|
||||
"SELECT raw_flow->'modules'->($1)->'value'->>'type' = 'flow' FROM {} WHERE id = $2",
|
||||
"job_params" || "queue",
|
||||
step as i32,
|
||||
&flow
|
||||
)
|
||||
.fetch_one(db)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
&flow).map_err(|e| {
|
||||
Error::InternalErr(format!("error during retrieval of step's type: {e:#}"))
|
||||
})?
|
||||
.unwrap_or(false)
|
||||
@@ -342,19 +354,9 @@ pub async fn update_flow_status_after_job_completion_internal<
|
||||
}
|
||||
_ => None,
|
||||
};
|
||||
let args = sqlx::query_as::<_, RowArgs>(
|
||||
"SELECT
|
||||
args
|
||||
FROM queue
|
||||
WHERE id = $2",
|
||||
)
|
||||
.bind(old_status.step)
|
||||
.bind(flow)
|
||||
.fetch_one(db)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
Error::InternalErr(format!("retrieval of args from state: {e:#}"))
|
||||
})?;
|
||||
|
||||
let args = get_args_from_job_id(db, &flow).await?;
|
||||
|
||||
compute_bool_from_expr(
|
||||
expr.to_string(),
|
||||
Marc::new(args.args.unwrap_or_default().0),
|
||||
@@ -442,7 +444,7 @@ pub async fn update_flow_status_after_job_completion_internal<
|
||||
None
|
||||
};
|
||||
|
||||
let nindex = if let Some(position) = position {
|
||||
let nindex = if let Some(position) = position {
|
||||
sqlx::query_scalar!(
|
||||
"UPDATE queue
|
||||
SET flow_status = JSONB_SET(
|
||||
@@ -489,7 +491,7 @@ pub async fn update_flow_status_after_job_completion_internal<
|
||||
None
|
||||
};
|
||||
|
||||
let nindex = if let Some(position) = position {
|
||||
let nindex = if let Some(position) = position {
|
||||
sqlx::query_scalar!(
|
||||
"UPDATE queue
|
||||
SET flow_status = JSONB_SET(
|
||||
@@ -658,7 +660,7 @@ pub async fn update_flow_status_after_job_completion_internal<
|
||||
flow_jobs_success,
|
||||
flow_jobs,
|
||||
..
|
||||
} if branch.to_owned() < len - 1 && (success || skip_branch_failure) => {
|
||||
} if *branch < len - 1 && (success || skip_branch_failure) => {
|
||||
if let Some(jobs) = flow_jobs {
|
||||
set_success_in_flow_job_success(
|
||||
flow_jobs_success,
|
||||
@@ -870,19 +872,7 @@ pub async fn update_flow_status_after_job_completion_internal<
|
||||
.as_ref()
|
||||
.and_then(|m| m.stop_after_all_iters_if.as_ref().map(|x| x.expr.clone()))
|
||||
{
|
||||
let args = sqlx::query_as::<_, RowArgs>(
|
||||
"SELECT
|
||||
args
|
||||
FROM queue
|
||||
WHERE id = $2",
|
||||
)
|
||||
.bind(old_status.step)
|
||||
.bind(flow)
|
||||
.fetch_one(db)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
Error::InternalErr(format!("retrieval of args from state: {e:#}"))
|
||||
})?;
|
||||
let args = get_args_from_job_id(db, &flow).await?;
|
||||
|
||||
let should_stop = compute_bool_from_expr(
|
||||
expr.to_string(),
|
||||
@@ -1244,7 +1234,7 @@ fn get_module(flow_job: &QueuedJob, module_step: &Step) -> Option<FlowModule> {
|
||||
if let Some(raw_flow) = raw_flow {
|
||||
match module_step {
|
||||
Step::PreprocessorStep => raw_flow.preprocessor_module.map(|x| *x.clone()),
|
||||
Step::Step(i) => raw_flow.modules.get(*i).map(|x| x.clone()),
|
||||
Step::Step(i) => raw_flow.modules.get(*i).cloned(),
|
||||
Step::FailureStep => raw_flow.failure_module.map(|x| *x.clone()),
|
||||
}
|
||||
} else {
|
||||
@@ -1269,8 +1259,7 @@ async fn compute_skip_branchall_failure<'c>(
|
||||
.map(|p| {
|
||||
BRANCHALL_INDEX_RE
|
||||
.captures(&p)
|
||||
.map(|x| x.get(1).unwrap().as_str().parse::<i32>().ok())
|
||||
.flatten()
|
||||
.and_then(|x| x.get(1).unwrap().as_str().parse::<i32>().ok())
|
||||
.ok_or(Error::InternalErr(format!(
|
||||
"could not parse branchall index from path: {p}"
|
||||
)))
|
||||
@@ -1291,14 +1280,14 @@ async fn compute_skip_branchall_failure<'c>(
|
||||
}
|
||||
|
||||
async fn has_failure_module<'c>(flow: Uuid, db: &DB) -> Result<bool, Error> {
|
||||
sqlx::query_scalar::<_, Option<bool>>(
|
||||
"SELECT raw_flow->'failure_module' != 'null'::jsonb
|
||||
FROM queue
|
||||
WHERE id = $1",
|
||||
fetch_one_with_fallback!(
|
||||
db,
|
||||
query_scalar,
|
||||
Option<bool>,
|
||||
"SELECT raw_flow->'failure_module' != 'null'::jsonb FROM {} WHERE id = $1",
|
||||
"job_params" || "queue",
|
||||
flow
|
||||
)
|
||||
.bind(flow)
|
||||
.fetch_one(db)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
Error::InternalErr(format!(
|
||||
"error during retrieval of has_failure_module: {e:#}"
|
||||
@@ -1610,6 +1599,7 @@ pub struct ResumeRow {
|
||||
|
||||
#[derive(FromRow)]
|
||||
pub struct RawArgs {
|
||||
#[allow(dead_code)]
|
||||
pub args: Option<Json<HashMap<String, Box<RawValue>>>>,
|
||||
}
|
||||
|
||||
@@ -1638,6 +1628,7 @@ fn potentially_crash_for_testing() {
|
||||
lazy_static::lazy_static! {
|
||||
pub static ref EHM: HashMap<String, Box<RawValue>> = HashMap::new();
|
||||
}
|
||||
|
||||
// #[async_recursion]
|
||||
// #[instrument(level = "trace", skip_all)]
|
||||
async fn push_next_flow_job<R: rsmq_async::RsmqConnection + Send + Sync + Clone>(
|
||||
@@ -1725,7 +1716,7 @@ async fn push_next_flow_job<R: rsmq_async::RsmqConnection + Send + Sync + Clone>
|
||||
flow_job.workspace_id.as_str(),
|
||||
flow_job.id
|
||||
).fetch_all(db).await?;
|
||||
if overlapping.len() > 0 {
|
||||
if !overlapping.is_empty() {
|
||||
let overlapping_str = overlapping
|
||||
.iter()
|
||||
.map(|x| x.to_string())
|
||||
@@ -1804,8 +1795,8 @@ async fn push_next_flow_job<R: rsmq_async::RsmqConnection + Send + Sync + Clone>
|
||||
// else pass the last job result. Either from the function arg if it's set, or manually fetch it from the previous job
|
||||
// having last_job_result empty can happen either when the job was suspended and is being restarted, or if it's a
|
||||
// flow restart from a specific step
|
||||
if last_job_result.is_some() {
|
||||
last_job_result.unwrap()
|
||||
if let Some(last_job_result) = last_job_result {
|
||||
last_job_result
|
||||
} else {
|
||||
match get_previous_job_result(db, flow_job.workspace_id.as_str(), &status).await? {
|
||||
None => Arc::new(to_raw_value(&json!("{}"))),
|
||||
@@ -2260,17 +2251,18 @@ async fn push_next_flow_job<R: rsmq_async::RsmqConnection + Send + Sync + Clone>
|
||||
);
|
||||
Ok(Marc::new(hm))
|
||||
} else if let Some(id) = get_args_from_id {
|
||||
let row = sqlx::query_as::<_, RawArgs>(
|
||||
"SELECT args FROM completed_job WHERE id = $1 AND workspace_id = $2",
|
||||
)
|
||||
.bind(id)
|
||||
.bind(&flow_job.workspace_id)
|
||||
.fetch_optional(db)
|
||||
.await?;
|
||||
let row = fetch_optional_with_fallback!(
|
||||
db,
|
||||
query_as,
|
||||
RowArgs,
|
||||
"SELECT args FROM {} WHERE id = $1 AND workspace_id = $2",
|
||||
"job_args" || "completed_job",
|
||||
id,
|
||||
&flow_job.workspace_id
|
||||
)?;
|
||||
|
||||
if let Some(raw_args) = row {
|
||||
Ok(Marc::new(
|
||||
raw_args.args.map(|x| x.0).unwrap_or_else(HashMap::new),
|
||||
))
|
||||
Ok(Marc::new(raw_args.args.map(|x| x.0).unwrap_or_default()))
|
||||
} else {
|
||||
Ok(Marc::new(HashMap::new()))
|
||||
}
|
||||
@@ -2389,7 +2381,7 @@ async fn push_next_flow_job<R: rsmq_async::RsmqConnection + Send + Sync + Clone>
|
||||
|
||||
let mut tx: QueueTransaction<'_, R> = (rsmq.clone(), db.begin().await?).into();
|
||||
let nargs = args.as_ref();
|
||||
for i in (0..len).into_iter() {
|
||||
for i in 0..len {
|
||||
if i % 100 == 0 && i != 0 {
|
||||
tracing::info!(id = %flow_job.id, root_id = %job_root, "pushed (non-commited yet) first {i} subflows of {len}");
|
||||
sqlx::query!(
|
||||
@@ -3902,13 +3894,15 @@ async fn get_previous_job_result(
|
||||
Ok(Some(retrieve_flow_jobs_results(db, w_id, flow_jobs).await?))
|
||||
}
|
||||
Some(FlowStatusModule::Success { job, .. }) => Ok(Some(
|
||||
sqlx::query_scalar::<_, Json<Box<RawValue>>>(
|
||||
"SELECT result FROM completed_job WHERE id = $1 AND workspace_id = $2",
|
||||
)
|
||||
.bind(job)
|
||||
.bind(w_id)
|
||||
.fetch_one(db)
|
||||
.await?
|
||||
fetch_one_with_fallback!(
|
||||
db,
|
||||
query_scalar,
|
||||
Json<Box<RawValue>>,
|
||||
"SELECT result FROM {} WHERE id = $1 AND workspace_id = $2",
|
||||
"completed_jobs_result" || "completed_job",
|
||||
job,
|
||||
w_id
|
||||
)?
|
||||
.0,
|
||||
)),
|
||||
_ => Ok(None),
|
||||
|
||||
3
frontend/.gitignore
vendored
3
frontend/.gitignore
vendored
@@ -9,4 +9,5 @@ tests-out/
|
||||
storageState.json
|
||||
.env.production
|
||||
dist/
|
||||
static/tsdocs/
|
||||
static/tsdocs/
|
||||
!build/.gitkeep
|
||||
Reference in New Issue
Block a user