Compare commits

...

1 Commits

Author SHA1 Message Date
Abel Lucas
75f2e39418 backend: make pull atomic again 2025-02-06 15:27:41 +01:00

View File

@@ -2060,6 +2060,7 @@ async fn pull_single_job_and_mark_as_running_no_concurrency_limit<'c>(
db: &Pool<Postgres>,
suspend_first: bool,
) -> windmill_common::error::Result<(Option<PulledJob>, bool)> {
let mut tx = db.begin().await?;
let job_and_suspended: (Option<PulledJob>, bool) = {
/* Jobs can be started if they:
* - haven't been started before,
@@ -2078,7 +2079,7 @@ async fn pull_single_job_and_mark_as_running_no_concurrency_limit<'c>(
let r = if suspend_first {
// tracing::info!("Pulling job with query: {}", query);
sqlx::query_as::<_, PulledJob>(&query)
.fetch_optional(db)
.fetch_optional(&mut *tx)
.await?
} else {
None
@@ -2098,7 +2099,7 @@ async fn pull_single_job_and_mark_as_running_no_concurrency_limit<'c>(
for query in queries.iter() {
// tracing::info!("Pulling job with query: {}", query);
let r = sqlx::query_as::<_, PulledJob>(query)
.fetch_optional(db)
.fetch_optional(&mut *tx)
.await?;
if let Some(pulled_job) = r {
@@ -2115,6 +2116,7 @@ async fn pull_single_job_and_mark_as_running_no_concurrency_limit<'c>(
(r, true)
}
};
tx.commit().await?;
Ok(job_and_suspended)
}