Compare commits
2 Commits
fix/inline
...
native-pol
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1da430b3a2 | ||
|
|
34020b5422 |
@@ -1822,11 +1822,53 @@ pub async fn run_workers(
|
||||
.expect("could not create initial worker dir");
|
||||
}
|
||||
|
||||
let native_mode = NATIVE_MODE_RESOLVED.load(std::sync::atomic::Ordering::Relaxed);
|
||||
|
||||
tracing::info!(
|
||||
"Starting {num_workers} workers and SLEEP_QUEUE={}ms",
|
||||
"Starting {num_workers} workers (native_polling={native_mode}) and SLEEP_QUEUE={}ms",
|
||||
*windmill_worker::SLEEP_QUEUE
|
||||
);
|
||||
|
||||
let native_job_rx = if native_mode && num_workers > 1 {
|
||||
let semaphore = std::sync::Arc::new(tokio::sync::Semaphore::new(num_workers));
|
||||
let (job_tx, job_rx) =
|
||||
tokio::sync::mpsc::channel::<windmill_worker::NativeJobDispatch>(num_workers);
|
||||
let native_job_rx: windmill_worker::NativeJobRx =
|
||||
std::sync::Arc::new(std::sync::Mutex::new(job_rx));
|
||||
|
||||
let first_conn = &workers[0];
|
||||
match &first_conn.conn {
|
||||
Connection::Sql(db) => {
|
||||
let poller_killpill_rx = rx.resubscribe();
|
||||
let poller_name = format!(
|
||||
"{}-poller",
|
||||
first_conn
|
||||
.worker_name
|
||||
.rsplit_once('-')
|
||||
.map_or(first_conn.worker_name.as_str(), |(prefix, _)| prefix)
|
||||
);
|
||||
let db = db.clone();
|
||||
handles.push(tokio::spawn(async move {
|
||||
windmill_worker::run_native_poller(
|
||||
&db,
|
||||
job_tx,
|
||||
semaphore,
|
||||
poller_killpill_rx,
|
||||
&poller_name,
|
||||
)
|
||||
.await;
|
||||
}));
|
||||
Some(native_job_rx)
|
||||
}
|
||||
Connection::Http(_) => {
|
||||
tracing::warn!("native polling mode is not supported with HTTP connections, falling back to individual polling");
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
for i in 1..(num_workers + 1) {
|
||||
let wk_conf = &workers[i as usize - 1];
|
||||
let conn1 = wk_conf.conn.clone();
|
||||
@@ -1837,6 +1879,7 @@ pub async fn run_workers(
|
||||
let tx = tx.clone();
|
||||
let base_internal_url = base_internal_url.clone();
|
||||
let hostname = hostname.clone();
|
||||
let native_job_rx = native_job_rx.clone();
|
||||
|
||||
handles.push(tokio::spawn(async move {
|
||||
if num_workers > 1 {
|
||||
@@ -1853,6 +1896,7 @@ pub async fn run_workers(
|
||||
rx,
|
||||
tx,
|
||||
&base_internal_url,
|
||||
native_job_rx,
|
||||
);
|
||||
|
||||
// #[cfg(tokio_unstable)]
|
||||
|
||||
@@ -71,7 +71,7 @@ use windmill_common::{
|
||||
worker::{
|
||||
load_env_vars, load_init_bash_from_env, load_periodic_bash_script_from_env,
|
||||
load_periodic_bash_script_interval_from_env, load_whitelist_env_vars_from_env,
|
||||
load_worker_config, reload_custom_tags_setting, store_pull_query,
|
||||
load_worker_config, reload_custom_tags_setting, store_pull_query, store_pull_query_batch,
|
||||
store_suspended_pull_query, Connection, WorkerConfig, DEFAULT_TAGS_PER_WORKSPACE,
|
||||
DEFAULT_TAGS_WORKSPACES, INDEXER_CONFIG, SCRIPT_TOKEN_EXPIRY, SMTP_CONFIG, TMP_DIR,
|
||||
WORKER_CONFIG, WORKER_GROUP,
|
||||
@@ -2268,6 +2268,7 @@ pub async fn reload_worker_config(db: &DB, tx: KillpillSender, kill_if_change: b
|
||||
tracing::info!("Reloading worker config...");
|
||||
store_suspended_pull_query(&config).await;
|
||||
store_pull_query(&config).await;
|
||||
store_pull_query_batch(&config).await;
|
||||
*wc = config
|
||||
}
|
||||
}
|
||||
|
||||
@@ -230,6 +230,7 @@ fn spawn_workers(
|
||||
}];
|
||||
windmill_common::worker::store_suspended_pull_query(&wc).await;
|
||||
windmill_common::worker::store_pull_query(&wc).await;
|
||||
windmill_common::worker::store_pull_query_batch(&wc).await;
|
||||
}
|
||||
windmill_worker::run_worker(
|
||||
&conn,
|
||||
@@ -241,6 +242,7 @@ fn spawn_workers(
|
||||
rx,
|
||||
tx2,
|
||||
&base_internal_url,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
};
|
||||
|
||||
@@ -5343,12 +5343,16 @@ async fn add_batch_jobs(
|
||||
if dedicated_worker && path.is_some() {
|
||||
windmill_common::worker::dedicated_worker_tag(&w_id, &path.clone().unwrap())
|
||||
} else {
|
||||
format!("{}", language.as_str())
|
||||
language
|
||||
.tag_str(job_kind == JobKind::Dependencies)
|
||||
.to_string()
|
||||
}
|
||||
} else if let Some(tag) = batch_info.tag {
|
||||
tag
|
||||
} else {
|
||||
format!("{}", language.as_str())
|
||||
language
|
||||
.tag_str(job_kind == JobKind::Dependencies)
|
||||
.to_string()
|
||||
};
|
||||
|
||||
let mut tx = user_db.begin(&authed).await?;
|
||||
|
||||
@@ -233,6 +233,7 @@ lazy_static::lazy_static! {
|
||||
}));
|
||||
|
||||
pub static ref WORKER_PULL_QUERIES: Arc<RwLock<Vec<String>>> = Arc::new(RwLock::new(vec![]));
|
||||
pub static ref WORKER_PULL_QUERIES_BATCH: Arc<RwLock<Vec<String>>> = Arc::new(RwLock::new(vec![]));
|
||||
pub static ref WORKER_SUSPENDED_PULL_QUERY: Arc<RwLock<String>> = Arc::new(RwLock::new("".to_string()));
|
||||
|
||||
|
||||
@@ -490,6 +491,69 @@ pub async fn store_pull_query(wc: &WorkerConfig) {
|
||||
*l = queries;
|
||||
}
|
||||
|
||||
fn format_pull_query_batch(peek: String) -> String {
|
||||
format!(
|
||||
"WITH peek AS (
|
||||
{}
|
||||
), q AS NOT MATERIALIZED (
|
||||
UPDATE v2_job_queue SET
|
||||
running = true,
|
||||
started_at = coalesce(started_at, now()),
|
||||
suspend_until = null,
|
||||
worker = $1
|
||||
WHERE id = ANY(ARRAY(SELECT id FROM peek))
|
||||
RETURNING
|
||||
id, started_at, scheduled_for,
|
||||
canceled_by, canceled_reason, worker, cache_ignore_s3_path, runnable_settings_handle
|
||||
), r AS NOT MATERIALIZED (
|
||||
UPDATE v2_job_runtime SET
|
||||
ping = now()
|
||||
WHERE id = ANY(ARRAY(SELECT id FROM q))
|
||||
) SELECT q.id, j.workspace_id, j.parent_job, j.created_by, q.started_at, q.scheduled_for,
|
||||
j.runnable_id, j.runnable_path, j.args, q.canceled_by,
|
||||
q.canceled_reason, j.kind, j.trigger, j.trigger_kind, j.permissioned_as,
|
||||
flow_status, j.script_lang,
|
||||
j.same_worker, j.pre_run_error, j.visible_to_owner,
|
||||
j.tag, j.concurrent_limit, j.concurrency_time_window_s, j.flow_innermost_root_job, j.root_job,
|
||||
j.timeout, j.flow_step_id, j.cache_ttl, q.cache_ignore_s3_path, q.runnable_settings_handle, j.priority, j.raw_code, j.raw_lock, j.raw_flow,
|
||||
j.script_entrypoint_override, j.preprocessed, COALESCE(pj.runnable_path, j.args->>'_FLOW_PATH') as parent_runnable_path,
|
||||
COALESCE(p.email, j.permissioned_as_email) as permissioned_as_email, p.username as permissioned_as_username, p.is_admin as permissioned_as_is_admin,
|
||||
p.is_operator as permissioned_as_is_operator, p.groups as permissioned_as_groups, p.folders as permissioned_as_folders, p.end_user_email as permissioned_as_end_user_email
|
||||
FROM q JOIN v2_job j ON j.id = q.id
|
||||
LEFT JOIN v2_job_status f ON f.id = q.id
|
||||
LEFT JOIN job_perms p ON p.job_id = q.id
|
||||
LEFT JOIN v2_job pj ON j.parent_job = pj.id
|
||||
",
|
||||
peek
|
||||
)
|
||||
}
|
||||
|
||||
pub fn make_pull_query_batch(tags: &[String]) -> String {
|
||||
format_pull_query_batch(format!(
|
||||
"SELECT id
|
||||
FROM v2_job_queue
|
||||
WHERE running = false AND tag IN ({}) AND scheduled_for <= now()
|
||||
ORDER BY priority DESC NULLS LAST, scheduled_for
|
||||
FOR UPDATE SKIP LOCKED
|
||||
LIMIT $2",
|
||||
tags.iter().map(|x| format!("'{x}'")).join(", ")
|
||||
))
|
||||
}
|
||||
|
||||
pub async fn store_pull_query_batch(wc: &WorkerConfig) {
|
||||
let mut queries = vec![];
|
||||
for tags in wc.priority_tags_sorted.iter() {
|
||||
if tags.tags.is_empty() {
|
||||
tracing::error!("Empty tags in priority tags, skipping");
|
||||
continue;
|
||||
}
|
||||
let query = make_pull_query_batch(&tags.tags);
|
||||
queries.push(query);
|
||||
}
|
||||
let mut l = WORKER_PULL_QUERIES_BATCH.write().await;
|
||||
*l = queries;
|
||||
}
|
||||
|
||||
pub const TMP_DIR: &str = "/tmp/windmill";
|
||||
pub const TMP_LOGS_DIR: &str = concatcp!(TMP_DIR, "/logs");
|
||||
|
||||
|
||||
@@ -74,7 +74,7 @@ use windmill_common::{
|
||||
utils::{not_found_if_none, report_critical_error, StripPath, WarnAfterExt},
|
||||
worker::{
|
||||
to_raw_value, CLOUD_HOSTED, DISABLE_FLOW_SCRIPT, NO_LOGS, WORKER_PULL_QUERIES,
|
||||
WORKER_SUSPENDED_PULL_QUERY,
|
||||
WORKER_PULL_QUERIES_BATCH, WORKER_SUSPENDED_PULL_QUERY,
|
||||
},
|
||||
DB, METRICS_ENABLED,
|
||||
};
|
||||
@@ -3434,6 +3434,65 @@ async fn pull_single_job_and_mark_as_running_no_concurrency_limit<'c>(
|
||||
Ok(job_and_suspended)
|
||||
}
|
||||
|
||||
pub async fn pull_batch(
|
||||
db: &Pool<Postgres>,
|
||||
worker_name: &str,
|
||||
limit: i32,
|
||||
) -> windmill_common::error::Result<Vec<PulledJob>> {
|
||||
let queries = WORKER_PULL_QUERIES_BATCH.read().await;
|
||||
|
||||
if queries.is_empty() {
|
||||
tracing::warn!("No batch pull queries available");
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let mut results: Vec<PulledJob> = Vec::new();
|
||||
let mut remaining = limit;
|
||||
|
||||
for query in queries.iter() {
|
||||
if remaining <= 0 {
|
||||
break;
|
||||
}
|
||||
|
||||
let jobs = timeout(
|
||||
Duration::from_secs(15),
|
||||
sqlx::query_as::<_, PulledJob>(query)
|
||||
.bind(worker_name)
|
||||
.bind(remaining)
|
||||
.fetch_all(db),
|
||||
)
|
||||
.await??;
|
||||
|
||||
for job in jobs {
|
||||
if job.is_flow() || job.is_dependency() {
|
||||
let per_workspace = per_workspace_tag(&job.workspace_id).await;
|
||||
let base_tag = if job.is_flow() {
|
||||
"flow".to_string()
|
||||
} else {
|
||||
"dependency".to_string()
|
||||
};
|
||||
let tag = if per_workspace {
|
||||
format!("{}-{}", base_tag, job.workspace_id)
|
||||
} else {
|
||||
base_tag
|
||||
};
|
||||
sqlx::query!(
|
||||
"UPDATE v2_job_queue SET tag = $1, running = false WHERE id = $2",
|
||||
tag,
|
||||
job.id
|
||||
)
|
||||
.execute(db)
|
||||
.await?;
|
||||
continue;
|
||||
}
|
||||
remaining -= 1;
|
||||
results.push(job);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
pub async fn custom_concurrency_key(
|
||||
db: &Pool<Postgres>,
|
||||
job_id: &Uuid,
|
||||
@@ -5373,15 +5432,7 @@ async fn push_inner<'c, 'd>(
|
||||
language
|
||||
.as_ref()
|
||||
.map(|x| {
|
||||
let tag_lang = if x == &ScriptLang::Bunnative {
|
||||
if job_kind == JobKind::Dependencies {
|
||||
ScriptLang::Bun.as_str()
|
||||
} else {
|
||||
ScriptLang::Nativets.as_str()
|
||||
}
|
||||
} else {
|
||||
x.as_str()
|
||||
};
|
||||
let tag_lang = x.tag_str(job_kind == JobKind::Dependencies);
|
||||
if per_workspace {
|
||||
format!("{}-{}", tag_lang, workspace_id)
|
||||
} else {
|
||||
|
||||
@@ -401,6 +401,7 @@ pub fn spawn_test_worker(
|
||||
}];
|
||||
windmill_common::worker::store_suspended_pull_query(&wc).await;
|
||||
windmill_common::worker::store_pull_query(&wc).await;
|
||||
windmill_common::worker::store_pull_query_batch(&wc).await;
|
||||
}
|
||||
windmill_worker::run_worker(
|
||||
&conn,
|
||||
@@ -412,6 +413,7 @@ pub fn spawn_test_worker(
|
||||
rx,
|
||||
tx2,
|
||||
&base_internal_url,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
};
|
||||
|
||||
@@ -88,6 +88,18 @@ impl ScriptLang {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn tag_str(&self, is_dependency: bool) -> &'static str {
|
||||
if self == &ScriptLang::Bunnative {
|
||||
if is_dependency {
|
||||
ScriptLang::Bun.as_str()
|
||||
} else {
|
||||
ScriptLang::Nativets.as_str()
|
||||
}
|
||||
} else {
|
||||
self.as_str()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn as_dependencies_filename(&self) -> Option<String> {
|
||||
use ScriptLang::*;
|
||||
Some(
|
||||
@@ -105,15 +117,15 @@ impl ScriptLang {
|
||||
pub fn is_native(&self) -> bool {
|
||||
matches!(
|
||||
self,
|
||||
ScriptLang::Bunnative |
|
||||
ScriptLang::Nativets |
|
||||
ScriptLang::Postgresql |
|
||||
ScriptLang::Mysql |
|
||||
ScriptLang::Graphql |
|
||||
ScriptLang::Snowflake |
|
||||
ScriptLang::Mssql |
|
||||
ScriptLang::Bigquery |
|
||||
ScriptLang::OracleDB
|
||||
ScriptLang::Bunnative
|
||||
| ScriptLang::Nativets
|
||||
| ScriptLang::Postgresql
|
||||
| ScriptLang::Mysql
|
||||
| ScriptLang::Graphql
|
||||
| ScriptLang::Snowflake
|
||||
| ScriptLang::Mssql
|
||||
| ScriptLang::Bigquery
|
||||
| ScriptLang::OracleDB
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
@@ -72,6 +72,7 @@ use windmill_parser::MainArgSignature;
|
||||
use windmill_queue::DedicatedWorkerJob;
|
||||
use windmill_queue::FlowRunners;
|
||||
use windmill_queue::MiniCompletedJob;
|
||||
use windmill_queue::PulledJobResult;
|
||||
use windmill_queue::PulledJobResultToJobErr;
|
||||
|
||||
use uuid::Uuid;
|
||||
@@ -89,8 +90,8 @@ use windmill_common::{
|
||||
};
|
||||
|
||||
use windmill_queue::{
|
||||
append_logs, canceled_job_to_result, empty_result, get_same_worker_job, pull, push_init_job,
|
||||
push_periodic_bash_job, CanceledBy, JobAndPerms, JobCompleted, MiniPulledJob,
|
||||
append_logs, canceled_job_to_result, empty_result, get_same_worker_job, pull, pull_batch,
|
||||
push_init_job, push_periodic_bash_job, CanceledBy, JobAndPerms, JobCompleted, MiniPulledJob,
|
||||
PrecomputedAgentInfo, PulledJob, SameWorkerPayload, HTTP_CLIENT, INIT_SCRIPT_TAG,
|
||||
PERIODIC_SCRIPT_TAG,
|
||||
};
|
||||
@@ -256,6 +257,13 @@ const VACUUM_PERIOD: u32 = 10000;
|
||||
|
||||
pub const MAX_BUFFERED_DEDICATED_JOBS: usize = 3;
|
||||
|
||||
pub struct NativeJobDispatch {
|
||||
pub job: PulledJobResult,
|
||||
pub permit: tokio::sync::OwnedSemaphorePermit,
|
||||
}
|
||||
|
||||
pub type NativeJobRx = Arc<std::sync::Mutex<tokio::sync::mpsc::Receiver<NativeJobDispatch>>>;
|
||||
|
||||
/// Per-language OTEL tracing proxy configuration.
|
||||
/// Default languages are configured in frontend instanceSettings.ts
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
||||
@@ -1348,6 +1356,87 @@ pub async fn create_job_dir(worker_directory: &str, job_id: impl Display) -> Str
|
||||
job_dir_path
|
||||
}
|
||||
|
||||
pub async fn run_native_poller(
|
||||
db: &sqlx::Pool<sqlx::Postgres>,
|
||||
job_tx: tokio::sync::mpsc::Sender<NativeJobDispatch>,
|
||||
semaphore: Arc<tokio::sync::Semaphore>,
|
||||
mut killpill_rx: tokio::sync::broadcast::Receiver<()>,
|
||||
worker_name: &str,
|
||||
) {
|
||||
loop {
|
||||
match killpill_rx.try_recv() {
|
||||
Ok(_) | Err(tokio::sync::broadcast::error::TryRecvError::Closed) => {
|
||||
tracing::info!(worker = %worker_name, "native poller: killpill received, exiting");
|
||||
break;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
let available = semaphore.available_permits();
|
||||
if available == 0 {
|
||||
tokio::time::sleep(Duration::from_millis(50)).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
let jobs = match timeout(
|
||||
Duration::from_secs(30),
|
||||
pull_batch(db, worker_name, available as i32),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(Ok(jobs)) => jobs,
|
||||
Ok(Err(e)) => {
|
||||
tracing::error!(worker = %worker_name, "native poller: pull_batch error: {e}");
|
||||
tokio::time::sleep(Duration::from_millis(*SLEEP_QUEUE)).await;
|
||||
continue;
|
||||
}
|
||||
Err(_) => {
|
||||
tracing::error!(worker = %worker_name, "native poller: pull_batch timed out");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
if jobs.is_empty() {
|
||||
tokio::time::sleep(Duration::from_millis(*SLEEP_QUEUE)).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
for pulled_job in jobs {
|
||||
let permit = semaphore
|
||||
.clone()
|
||||
.try_acquire_owned()
|
||||
.expect("permits can only increase between available_permits() and here");
|
||||
|
||||
let mut job = PulledJobResult {
|
||||
job: Some(pulled_job),
|
||||
suspended: false,
|
||||
missing_concurrency_key: false,
|
||||
error_while_preprocessing: None,
|
||||
};
|
||||
|
||||
if let Err(e) = timeout(
|
||||
core::time::Duration::from_secs(10),
|
||||
job.maybe_apply_debouncing(db),
|
||||
)
|
||||
.await
|
||||
.map_err(error::Error::from)
|
||||
.and_then(|r| r)
|
||||
{
|
||||
job.error_while_preprocessing = Some(e.to_string());
|
||||
}
|
||||
|
||||
if job_tx
|
||||
.send(NativeJobDispatch { job, permit })
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
tracing::info!(worker = %worker_name, "native poller: channel closed, exiting");
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run_worker(
|
||||
conn: &Connection,
|
||||
hostname: &str,
|
||||
@@ -1358,6 +1447,7 @@ pub async fn run_worker(
|
||||
mut killpill_rx: tokio::sync::broadcast::Receiver<()>,
|
||||
killpill_tx: KillpillSender,
|
||||
base_internal_url: &str,
|
||||
native_job_rx: Option<NativeJobRx>,
|
||||
) {
|
||||
#[cfg(not(feature = "enterprise"))]
|
||||
if is_sandboxing_enabled() {
|
||||
@@ -1818,10 +1908,13 @@ pub async fn run_worker(
|
||||
let mut last_30jobs_suspended = 0;
|
||||
let mut last_suspend_first = Instant::now();
|
||||
let mut killed_but_draining_same_worker_jobs = false;
|
||||
let mut native_permit: Option<tokio::sync::OwnedSemaphorePermit> = None;
|
||||
|
||||
let mut killpill_rx2 = killpill_rx.resubscribe();
|
||||
|
||||
loop {
|
||||
drop(native_permit.take());
|
||||
|
||||
let last_processing_duration_secs = last_processing_duration.load(Ordering::SeqCst);
|
||||
if last_processing_duration_secs > 5 {
|
||||
let sleep_duration = if last_processing_duration_secs > 10 {
|
||||
@@ -2057,6 +2150,27 @@ pub async fn run_worker(
|
||||
tokio::time::sleep(Duration::from_millis(200)).await;
|
||||
continue;
|
||||
}
|
||||
} else if let Some(ref native_rx) = native_job_rx {
|
||||
let dispatch = native_rx.lock().unwrap().try_recv().ok();
|
||||
match dispatch {
|
||||
Some(d) => {
|
||||
native_permit = Some(d.permit);
|
||||
match d.job.to_pulled_job() {
|
||||
Ok(j) => Ok(j.map(|job| NextJob::Sql { flow_runners: None, job })),
|
||||
Err(PulledJobResultToJobErr::MissingConcurrencyKey(jc))
|
||||
| Err(PulledJobResultToJobErr::ErrorWhilePreprocessing(jc)) => {
|
||||
if let Err(err) = job_completed_tx.send_job(jc, true).await {
|
||||
tracing::error!(
|
||||
"An error occurred while sending job completed: {:#?}",
|
||||
err
|
||||
)
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
}
|
||||
None => Ok(None),
|
||||
}
|
||||
} else {
|
||||
match &conn {
|
||||
Connection::Sql(db) => {
|
||||
|
||||
Reference in New Issue
Block a user