Compare commits

...

2 Commits

Author SHA1 Message Date
HugoCasa
1da430b3a2 perf: optimize batch pull query with ANY(ARRAY()) and chained CTEs
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-04 17:00:42 +01:00
HugoCasa
34020b5422 feat: batch pull for native polling workers
Add a batch pull mechanism for native mode workers. Instead of each
worker independently polling the DB, a single poller pulls up to N
jobs in one query and dispatches them via an mpsc channel.

- Add `pull_batch()` in windmill-queue with batch SQL query
- Add `run_native_poller()` loop with semaphore-based backpressure
- Workers consume jobs via `try_recv()` on a shared channel
- Detect native mode from both env and DB worker group config
- Add `ScriptLang::tag_str()` to centralize bunnative→nativets mapping
- Fix `add_batch_jobs` tag assignment to use consistent mapping

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 19:52:48 +01:00
9 changed files with 319 additions and 25 deletions

View File

@@ -1822,11 +1822,53 @@ pub async fn run_workers(
.expect("could not create initial worker dir");
}
let native_mode = NATIVE_MODE_RESOLVED.load(std::sync::atomic::Ordering::Relaxed);
tracing::info!(
"Starting {num_workers} workers and SLEEP_QUEUE={}ms",
"Starting {num_workers} workers (native_polling={native_mode}) and SLEEP_QUEUE={}ms",
*windmill_worker::SLEEP_QUEUE
);
let native_job_rx = if native_mode && num_workers > 1 {
let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(num_workers));
let (job_tx, job_rx) =
tokio::sync::mpsc::channel::<windmill_worker::NativeJobDispatch>(num_workers);
let native_job_rx: windmill_worker::NativeJobRx =
std::sync::Arc::new(std::sync::Mutex::new(job_rx));
let first_conn = &workers[0];
match &first_conn.conn {
Connection::Sql(db) => {
let poller_killpill_rx = rx.resubscribe();
let poller_name = format!(
"{}-poller",
first_conn
.worker_name
.rsplit_once('-')
.map_or(first_conn.worker_name.as_str(), |(prefix, _)| prefix)
);
let db = db.clone();
handles.push(tokio::spawn(async move {
windmill_worker::run_native_poller(
&db,
job_tx,
semaphore,
poller_killpill_rx,
&poller_name,
)
.await;
}));
Some(native_job_rx)
}
Connection::Http(_) => {
tracing::warn!("native polling mode is not supported with HTTP connections, falling back to individual polling");
None
}
}
} else {
None
};
for i in 1..(num_workers + 1) {
let wk_conf = &workers[i as usize - 1];
let conn1 = wk_conf.conn.clone();
@@ -1837,6 +1879,7 @@ pub async fn run_workers(
let tx = tx.clone();
let base_internal_url = base_internal_url.clone();
let hostname = hostname.clone();
let native_job_rx = native_job_rx.clone();
handles.push(tokio::spawn(async move {
if num_workers > 1 {
@@ -1853,6 +1896,7 @@ pub async fn run_workers(
rx,
tx,
&base_internal_url,
native_job_rx,
);
// #[cfg(tokio_unstable)]

View File

@@ -71,7 +71,7 @@ use windmill_common::{
worker::{
load_env_vars, load_init_bash_from_env, load_periodic_bash_script_from_env,
load_periodic_bash_script_interval_from_env, load_whitelist_env_vars_from_env,
load_worker_config, reload_custom_tags_setting, store_pull_query,
load_worker_config, reload_custom_tags_setting, store_pull_query, store_pull_query_batch,
store_suspended_pull_query, Connection, WorkerConfig, DEFAULT_TAGS_PER_WORKSPACE,
DEFAULT_TAGS_WORKSPACES, INDEXER_CONFIG, SCRIPT_TOKEN_EXPIRY, SMTP_CONFIG, TMP_DIR,
WORKER_CONFIG, WORKER_GROUP,
@@ -2268,6 +2268,7 @@ pub async fn reload_worker_config(db: &DB, tx: KillpillSender, kill_if_change: b
tracing::info!("Reloading worker config...");
store_suspended_pull_query(&config).await;
store_pull_query(&config).await;
store_pull_query_batch(&config).await;
*wc = config
}
}

View File

@@ -230,6 +230,7 @@ fn spawn_workers(
}];
windmill_common::worker::store_suspended_pull_query(&wc).await;
windmill_common::worker::store_pull_query(&wc).await;
windmill_common::worker::store_pull_query_batch(&wc).await;
}
windmill_worker::run_worker(
&conn,
@@ -241,6 +242,7 @@ fn spawn_workers(
rx,
tx2,
&base_internal_url,
None,
)
.await;
};

View File

@@ -5343,12 +5343,16 @@ async fn add_batch_jobs(
if dedicated_worker && path.is_some() {
windmill_common::worker::dedicated_worker_tag(&w_id, &path.clone().unwrap())
} else {
format!("{}", language.as_str())
language
.tag_str(job_kind == JobKind::Dependencies)
.to_string()
}
} else if let Some(tag) = batch_info.tag {
tag
} else {
format!("{}", language.as_str())
language
.tag_str(job_kind == JobKind::Dependencies)
.to_string()
};
let mut tx = user_db.begin(&authed).await?;

View File

@@ -233,6 +233,7 @@ lazy_static::lazy_static! {
}));
pub static ref WORKER_PULL_QUERIES: Arc<RwLock<Vec<String>>> = Arc::new(RwLock::new(vec![]));
pub static ref WORKER_PULL_QUERIES_BATCH: Arc<RwLock<Vec<String>>> = Arc::new(RwLock::new(vec![]));
pub static ref WORKER_SUSPENDED_PULL_QUERY: Arc<RwLock<String>> = Arc::new(RwLock::new("".to_string()));
@@ -490,6 +491,69 @@ pub async fn store_pull_query(wc: &WorkerConfig) {
*l = queries;
}
fn format_pull_query_batch(peek: String) -> String {
format!(
"WITH peek AS (
{}
), q AS NOT MATERIALIZED (
UPDATE v2_job_queue SET
running = true,
started_at = coalesce(started_at, now()),
suspend_until = null,
worker = $1
WHERE id = ANY(ARRAY(SELECT id FROM peek))
RETURNING
id, started_at, scheduled_for,
canceled_by, canceled_reason, worker, cache_ignore_s3_path, runnable_settings_handle
), r AS NOT MATERIALIZED (
UPDATE v2_job_runtime SET
ping = now()
WHERE id = ANY(ARRAY(SELECT id FROM q))
) SELECT q.id, j.workspace_id, j.parent_job, j.created_by, q.started_at, q.scheduled_for,
j.runnable_id, j.runnable_path, j.args, q.canceled_by,
q.canceled_reason, j.kind, j.trigger, j.trigger_kind, j.permissioned_as,
flow_status, j.script_lang,
j.same_worker, j.pre_run_error, j.visible_to_owner,
j.tag, j.concurrent_limit, j.concurrency_time_window_s, j.flow_innermost_root_job, j.root_job,
j.timeout, j.flow_step_id, j.cache_ttl, q.cache_ignore_s3_path, q.runnable_settings_handle, j.priority, j.raw_code, j.raw_lock, j.raw_flow,
j.script_entrypoint_override, j.preprocessed, COALESCE(pj.runnable_path, j.args->>'_FLOW_PATH') as parent_runnable_path,
COALESCE(p.email, j.permissioned_as_email) as permissioned_as_email, p.username as permissioned_as_username, p.is_admin as permissioned_as_is_admin,
p.is_operator as permissioned_as_is_operator, p.groups as permissioned_as_groups, p.folders as permissioned_as_folders, p.end_user_email as permissioned_as_end_user_email
FROM q JOIN v2_job j ON j.id = q.id
LEFT JOIN v2_job_status f ON f.id = q.id
LEFT JOIN job_perms p ON p.job_id = q.id
LEFT JOIN v2_job pj ON j.parent_job = pj.id
",
peek
)
}
pub fn make_pull_query_batch(tags: &[String]) -> String {
format_pull_query_batch(format!(
"SELECT id
FROM v2_job_queue
WHERE running = false AND tag IN ({}) AND scheduled_for <= now()
ORDER BY priority DESC NULLS LAST, scheduled_for
FOR UPDATE SKIP LOCKED
LIMIT $2",
tags.iter().map(|x| format!("'{x}'")).join(", ")
))
}
pub async fn store_pull_query_batch(wc: &WorkerConfig) {
let mut queries = vec![];
for tags in wc.priority_tags_sorted.iter() {
if tags.tags.is_empty() {
tracing::error!("Empty tags in priority tags, skipping");
continue;
}
let query = make_pull_query_batch(&tags.tags);
queries.push(query);
}
let mut l = WORKER_PULL_QUERIES_BATCH.write().await;
*l = queries;
}
pub const TMP_DIR: &str = "/tmp/windmill";
pub const TMP_LOGS_DIR: &str = concatcp!(TMP_DIR, "/logs");

View File

@@ -74,7 +74,7 @@ use windmill_common::{
utils::{not_found_if_none, report_critical_error, StripPath, WarnAfterExt},
worker::{
to_raw_value, CLOUD_HOSTED, DISABLE_FLOW_SCRIPT, NO_LOGS, WORKER_PULL_QUERIES,
WORKER_SUSPENDED_PULL_QUERY,
WORKER_PULL_QUERIES_BATCH, WORKER_SUSPENDED_PULL_QUERY,
},
DB, METRICS_ENABLED,
};
@@ -3434,6 +3434,65 @@ async fn pull_single_job_and_mark_as_running_no_concurrency_limit<'c>(
Ok(job_and_suspended)
}
pub async fn pull_batch(
db: &Pool<Postgres>,
worker_name: &str,
limit: i32,
) -> windmill_common::error::Result<Vec<PulledJob>> {
let queries = WORKER_PULL_QUERIES_BATCH.read().await;
if queries.is_empty() {
tracing::warn!("No batch pull queries available");
return Ok(vec![]);
}
let mut results: Vec<PulledJob> = Vec::new();
let mut remaining = limit;
for query in queries.iter() {
if remaining <= 0 {
break;
}
let jobs = timeout(
Duration::from_secs(15),
sqlx::query_as::<_, PulledJob>(query)
.bind(worker_name)
.bind(remaining)
.fetch_all(db),
)
.await??;
for job in jobs {
if job.is_flow() || job.is_dependency() {
let per_workspace = per_workspace_tag(&job.workspace_id).await;
let base_tag = if job.is_flow() {
"flow".to_string()
} else {
"dependency".to_string()
};
let tag = if per_workspace {
format!("{}-{}", base_tag, job.workspace_id)
} else {
base_tag
};
sqlx::query!(
"UPDATE v2_job_queue SET tag = $1, running = false WHERE id = $2",
tag,
job.id
)
.execute(db)
.await?;
continue;
}
remaining -= 1;
results.push(job);
}
}
Ok(results)
}
pub async fn custom_concurrency_key(
db: &Pool<Postgres>,
job_id: &Uuid,
@@ -5373,15 +5432,7 @@ async fn push_inner<'c, 'd>(
language
.as_ref()
.map(|x| {
let tag_lang = if x == &ScriptLang::Bunnative {
if job_kind == JobKind::Dependencies {
ScriptLang::Bun.as_str()
} else {
ScriptLang::Nativets.as_str()
}
} else {
x.as_str()
};
let tag_lang = x.tag_str(job_kind == JobKind::Dependencies);
if per_workspace {
format!("{}-{}", tag_lang, workspace_id)
} else {

View File

@@ -401,6 +401,7 @@ pub fn spawn_test_worker(
}];
windmill_common::worker::store_suspended_pull_query(&wc).await;
windmill_common::worker::store_pull_query(&wc).await;
windmill_common::worker::store_pull_query_batch(&wc).await;
}
windmill_worker::run_worker(
&conn,
@@ -412,6 +413,7 @@ pub fn spawn_test_worker(
rx,
tx2,
&base_internal_url,
None,
)
.await
};

View File

@@ -88,6 +88,18 @@ impl ScriptLang {
}
}
pub fn tag_str(&self, is_dependency: bool) -> &'static str {
if self == &ScriptLang::Bunnative {
if is_dependency {
ScriptLang::Bun.as_str()
} else {
ScriptLang::Nativets.as_str()
}
} else {
self.as_str()
}
}
pub fn as_dependencies_filename(&self) -> Option<String> {
use ScriptLang::*;
Some(
@@ -105,15 +117,15 @@ impl ScriptLang {
pub fn is_native(&self) -> bool {
matches!(
self,
ScriptLang::Bunnative |
ScriptLang::Nativets |
ScriptLang::Postgresql |
ScriptLang::Mysql |
ScriptLang::Graphql |
ScriptLang::Snowflake |
ScriptLang::Mssql |
ScriptLang::Bigquery |
ScriptLang::OracleDB
ScriptLang::Bunnative
| ScriptLang::Nativets
| ScriptLang::Postgresql
| ScriptLang::Mysql
| ScriptLang::Graphql
| ScriptLang::Snowflake
| ScriptLang::Mssql
| ScriptLang::Bigquery
| ScriptLang::OracleDB
)
}

View File

@@ -72,6 +72,7 @@ use windmill_parser::MainArgSignature;
use windmill_queue::DedicatedWorkerJob;
use windmill_queue::FlowRunners;
use windmill_queue::MiniCompletedJob;
use windmill_queue::PulledJobResult;
use windmill_queue::PulledJobResultToJobErr;
use uuid::Uuid;
@@ -89,8 +90,8 @@ use windmill_common::{
};
use windmill_queue::{
append_logs, canceled_job_to_result, empty_result, get_same_worker_job, pull, push_init_job,
push_periodic_bash_job, CanceledBy, JobAndPerms, JobCompleted, MiniPulledJob,
append_logs, canceled_job_to_result, empty_result, get_same_worker_job, pull, pull_batch,
push_init_job, push_periodic_bash_job, CanceledBy, JobAndPerms, JobCompleted, MiniPulledJob,
PrecomputedAgentInfo, PulledJob, SameWorkerPayload, HTTP_CLIENT, INIT_SCRIPT_TAG,
PERIODIC_SCRIPT_TAG,
};
@@ -256,6 +257,13 @@ const VACUUM_PERIOD: u32 = 10000;
pub const MAX_BUFFERED_DEDICATED_JOBS: usize = 3;
pub struct NativeJobDispatch {
pub job: PulledJobResult,
pub permit: tokio::sync::OwnedSemaphorePermit,
}
pub type NativeJobRx = Arc<std::sync::Mutex<tokio::sync::mpsc::Receiver<NativeJobDispatch>>>;
/// Per-language OTEL tracing proxy configuration.
/// Default languages are configured in frontend instanceSettings.ts
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
@@ -1348,6 +1356,87 @@ pub async fn create_job_dir(worker_directory: &str, job_id: impl Display) -> Str
job_dir_path
}
pub async fn run_native_poller(
db: &sqlx::Pool<sqlx::Postgres>,
job_tx: tokio::sync::mpsc::Sender<NativeJobDispatch>,
semaphore: Arc<tokio::sync::Semaphore>,
mut killpill_rx: tokio::sync::broadcast::Receiver<()>,
worker_name: &str,
) {
loop {
match killpill_rx.try_recv() {
Ok(_) | Err(tokio::sync::broadcast::error::TryRecvError::Closed) => {
tracing::info!(worker = %worker_name, "native poller: killpill received, exiting");
break;
}
_ => {}
}
let available = semaphore.available_permits();
if available == 0 {
tokio::time::sleep(Duration::from_millis(50)).await;
continue;
}
let jobs = match timeout(
Duration::from_secs(30),
pull_batch(db, worker_name, available as i32),
)
.await
{
Ok(Ok(jobs)) => jobs,
Ok(Err(e)) => {
tracing::error!(worker = %worker_name, "native poller: pull_batch error: {e}");
tokio::time::sleep(Duration::from_millis(*SLEEP_QUEUE)).await;
continue;
}
Err(_) => {
tracing::error!(worker = %worker_name, "native poller: pull_batch timed out");
continue;
}
};
if jobs.is_empty() {
tokio::time::sleep(Duration::from_millis(*SLEEP_QUEUE)).await;
continue;
}
for pulled_job in jobs {
let permit = semaphore
.clone()
.try_acquire_owned()
.expect("permits can only increase between available_permits() and here");
let mut job = PulledJobResult {
job: Some(pulled_job),
suspended: false,
missing_concurrency_key: false,
error_while_preprocessing: None,
};
if let Err(e) = timeout(
core::time::Duration::from_secs(10),
job.maybe_apply_debouncing(db),
)
.await
.map_err(error::Error::from)
.and_then(|r| r)
{
job.error_while_preprocessing = Some(e.to_string());
}
if job_tx
.send(NativeJobDispatch { job, permit })
.await
.is_err()
{
tracing::info!(worker = %worker_name, "native poller: channel closed, exiting");
return;
}
}
}
}
pub async fn run_worker(
conn: &Connection,
hostname: &str,
@@ -1358,6 +1447,7 @@ pub async fn run_worker(
mut killpill_rx: tokio::sync::broadcast::Receiver<()>,
killpill_tx: KillpillSender,
base_internal_url: &str,
native_job_rx: Option<NativeJobRx>,
) {
#[cfg(not(feature = "enterprise"))]
if is_sandboxing_enabled() {
@@ -1818,10 +1908,13 @@ pub async fn run_worker(
let mut last_30jobs_suspended = 0;
let mut last_suspend_first = Instant::now();
let mut killed_but_draining_same_worker_jobs = false;
let mut native_permit: Option<tokio::sync::OwnedSemaphorePermit> = None;
let mut killpill_rx2 = killpill_rx.resubscribe();
loop {
drop(native_permit.take());
let last_processing_duration_secs = last_processing_duration.load(Ordering::SeqCst);
if last_processing_duration_secs > 5 {
let sleep_duration = if last_processing_duration_secs > 10 {
@@ -2057,6 +2150,27 @@ pub async fn run_worker(
tokio::time::sleep(Duration::from_millis(200)).await;
continue;
}
} else if let Some(ref native_rx) = native_job_rx {
let dispatch = native_rx.lock().unwrap().try_recv().ok();
match dispatch {
Some(d) => {
native_permit = Some(d.permit);
match d.job.to_pulled_job() {
Ok(j) => Ok(j.map(|job| NextJob::Sql { flow_runners: None, job })),
Err(PulledJobResultToJobErr::MissingConcurrencyKey(jc))
| Err(PulledJobResultToJobErr::ErrorWhilePreprocessing(jc)) => {
if let Err(err) = job_completed_tx.send_job(jc, true).await {
tracing::error!(
"An error occurred while sending job completed: {:#?}",
err
)
}
Ok(None)
}
}
}
None => Ok(None),
}
} else {
match &conn {
Connection::Sql(db) => {