feat: native mode (#7939)

* feat: native mode

* improve

* fix build

* review fixes

* tracing nit
This commit is contained in:
hugocasa
2026-02-17 00:36:41 +01:00
committed by GitHub
parent 0940d70a2b
commit 535e108cbf
19 changed files with 241 additions and 53 deletions

View File

@@ -257,6 +257,7 @@ On self-hosted instances, you might want to import all the approved resource typ
| BASE_URL | http://localhost:8000 | The base url that is exposed publicly to access your instance. Is overriden by the instance settings if any. | Server |
| ZOMBIE_JOB_TIMEOUT | 30 | The timeout after which a job is considered to be zombie if the worker did not send pings about processing the job (every server check for zombie jobs every 30s) | Server |
| RESTART_ZOMBIE_JOBS | true | If true then a zombie job is restarted (in-place with the same uuid and some logs), if false the zombie job is failed | Server |
| NATIVE_MODE | false | Enable native mode: sets NUM_WORKERS=8, rejects non-native jobs (nativets, postgresql, mysql, etc.) | Worker |
| SLEEP_QUEUE | 50 | The number of ms to sleep in between the last check for new jobs in the DB. It is multiplied by NUM_WORKERS such that in average, for one worker instance, there is one pull every SLEEP_QUEUE ms. | Worker |
| KEEP_JOB_DIR | false | Keep the job directory after the job is done. Useful for debugging. | Worker |
| LICENSE_KEY (EE only) | None | License key checked at startup for the Enterprise Edition of Windmill | Worker |

View File

@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "INSERT INTO worker_ping (worker_instance, worker, ip, custom_tags, worker_group, dedicated_worker, dedicated_workers, wm_version, vcpus, memory, job_isolation) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (worker)\n DO UPDATE set ip = EXCLUDED.ip, custom_tags = EXCLUDED.custom_tags, worker_group = EXCLUDED.worker_group, dedicated_workers = EXCLUDED.dedicated_workers",
"query": "INSERT INTO worker_ping (worker_instance, worker, ip, custom_tags, worker_group, dedicated_worker, dedicated_workers, wm_version, vcpus, memory, job_isolation, native_mode) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) ON CONFLICT (worker)\n DO UPDATE set ip = EXCLUDED.ip, custom_tags = EXCLUDED.custom_tags, worker_group = EXCLUDED.worker_group, dedicated_workers = EXCLUDED.dedicated_workers, native_mode = EXCLUDED.native_mode",
"describe": {
"columns": [],
"parameters": {
@@ -15,10 +15,11 @@
"Varchar",
"Int8",
"Int8",
"Text"
"Text",
"Bool"
]
},
"nullable": []
},
"hash": "97c61b6a9a5112ea484565236959a544511d5d501fb737da8110a8725b883465"
"hash": "298fa4f8eb05b4c3f33b608b0cdb6ed918af2df012de33acb3befd3fcccbc257"
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "SELECT worker, worker_instance, EXTRACT(EPOCH FROM (now() - ping_at))::integer as last_ping, started_at, ip, jobs_executed,\n CASE WHEN $4 IS TRUE THEN current_job_id ELSE NULL END as last_job_id, CASE WHEN $4 IS TRUE THEN current_job_workspace_id ELSE NULL END as last_job_workspace_id,\n custom_tags, worker_group, wm_version, occupancy_rate, occupancy_rate_15s, occupancy_rate_5m, occupancy_rate_30m, memory, vcpus, memory_usage, wm_memory_usage, job_isolation\n FROM worker_ping\n WHERE ($1::integer IS NULL AND ping_at > now() - interval '5 minute') OR (ping_at > now() - ($1 || ' seconds')::interval)\n ORDER BY ping_at desc LIMIT $2 OFFSET $3",
"query": "SELECT worker, worker_instance, EXTRACT(EPOCH FROM (now() - ping_at))::integer as last_ping, started_at, ip, jobs_executed,\n CASE WHEN $4 IS TRUE THEN current_job_id ELSE NULL END as last_job_id, CASE WHEN $4 IS TRUE THEN current_job_workspace_id ELSE NULL END as last_job_workspace_id,\n custom_tags, worker_group, wm_version, occupancy_rate, occupancy_rate_15s, occupancy_rate_5m, occupancy_rate_30m, memory, vcpus, memory_usage, wm_memory_usage, job_isolation, native_mode\n FROM worker_ping\n WHERE ($1::integer IS NULL AND ping_at > now() - interval '5 minute') OR (ping_at > now() - ($1 || ' seconds')::interval)\n ORDER BY ping_at desc LIMIT $2 OFFSET $3",
"describe": {
"columns": [
{
@@ -102,6 +102,11 @@
"ordinal": 19,
"name": "job_isolation",
"type_info": "Text"
},
{
"ordinal": 20,
"name": "native_mode",
"type_info": "Bool"
}
],
"parameters": {
@@ -132,8 +137,9 @@
true,
true,
true,
true
true,
false
]
},
"hash": "771a858a4b7ca41b6787e61f5a4a5c9c4d48fd213852e2f997cd4b2420580d30"
"hash": "68bca8f839e47705b11d312ee874eceaa3d1d24d9053ad4aea94b9f8465585ca"
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "UPDATE worker_ping SET ping_at = now(), jobs_executed = $1, custom_tags = $2,\n occupancy_rate = $3, memory_usage = $4, wm_memory_usage = $5, vcpus = COALESCE($7, vcpus),\n memory = COALESCE($8, memory), occupancy_rate_15s = $9, occupancy_rate_5m = $10, occupancy_rate_30m = $11 WHERE worker = $6",
"query": "UPDATE worker_ping SET ping_at = now(), jobs_executed = $1, custom_tags = $2,\n occupancy_rate = $3, memory_usage = $4, wm_memory_usage = $5, vcpus = COALESCE($7, vcpus),\n memory = COALESCE($8, memory), occupancy_rate_15s = $9, occupancy_rate_5m = $10, occupancy_rate_30m = $11, native_mode = $12 WHERE worker = $6",
"describe": {
"columns": [],
"parameters": {
@@ -15,10 +15,11 @@
"Int8",
"Float4",
"Float4",
"Float4"
"Float4",
"Bool"
]
},
"nullable": []
},
"hash": "aa523c363186575b4bd2537b8e2430e6938e7cc35f8c9e2d1c5459a85443cbdd"
"hash": "a41c4cbaffdb714e4a963557de5a4011744d684eb24e03cb4beae6a512613159"
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
"query": "SELECT worker, worker_instance, worker_group, vcpus, memory, ping_at, started_at, custom_tags, occupancy_rate_15s, occupancy_rate_5m, occupancy_rate_30m FROM worker_ping WHERE ping_at > now() - interval '30 days' ORDER BY started_at",
"query": "SELECT worker, worker_instance, worker_group, vcpus, memory, ping_at, started_at, custom_tags, occupancy_rate_15s, occupancy_rate_5m, occupancy_rate_30m, native_mode FROM worker_ping WHERE ping_at > now() - interval '30 days' ORDER BY started_at",
"describe": {
"columns": [
{
@@ -57,6 +57,11 @@
"ordinal": 10,
"name": "occupancy_rate_30m",
"type_info": "Float4"
},
{
"ordinal": 11,
"name": "native_mode",
"type_info": "Bool"
}
],
"parameters": {
@@ -73,8 +78,9 @@
true,
true,
true,
true
true,
false
]
},
"hash": "2b3b634b15eb58b95ce26b5a591258b54fb7bf21ae85e7a390ad73489c2247ac"
"hash": "dcc6928bc273fcbe52bcef43f9d06d8bb8c68a1b04b3c2cce7491dde5d727446"
}

View File

@@ -1 +1 @@
9f6e1e533df7711600ec2b8d5f0c958448db1a20
06207a2083383e0320da287bdf0268454db515de

View File

@@ -0,0 +1 @@
ALTER TABLE worker_ping DROP COLUMN IF EXISTS native_mode;

View File

@@ -0,0 +1 @@
ALTER TABLE worker_ping ADD COLUMN IF NOT EXISTS native_mode BOOLEAN NOT NULL DEFAULT false;

View File

@@ -47,8 +47,8 @@ use windmill_common::{
JWT_SECRET_SETTING, KEEP_JOB_DIR_SETTING, LICENSE_KEY_SETTING, MAVEN_REPOS_SETTING,
MAVEN_SETTINGS_XML_SETTING, MONITOR_LOGS_ON_OBJECT_STORE_SETTING, NO_DEFAULT_MAVEN_SETTING,
NPM_CONFIG_REGISTRY_SETTING, NUGET_CONFIG_SETTING, OAUTH_SETTING, OTEL_SETTING,
OTEL_TRACING_PROXY_SETTING, PIP_INDEX_URL_SETTING,
POWERSHELL_REPO_PAT_SETTING, POWERSHELL_REPO_URL_SETTING, REQUEST_SIZE_LIMIT_SETTING,
OTEL_TRACING_PROXY_SETTING, PIP_INDEX_URL_SETTING, POWERSHELL_REPO_PAT_SETTING,
POWERSHELL_REPO_URL_SETTING, REQUEST_SIZE_LIMIT_SETTING,
REQUIRE_PREEXISTING_USER_FOR_OAUTH_SETTING, RETENTION_PERIOD_SECS_SETTING,
RUBY_REPOS_SETTING, SAML_METADATA_SETTING, SCIM_TOKEN_SETTING, SMTP_SETTING, TEAMS_SETTING,
TIMEOUT_WAIT_RESULT_SETTING, UV_INDEX_STRATEGY_SETTING,
@@ -61,8 +61,8 @@ use windmill_common::{
MODE_AND_ADDONS,
},
worker::{
reload_custom_tags_setting, Connection, HUB_CACHE_DIR, HUB_RT_CACHE_DIR, TMP_DIR,
TMP_LOGS_DIR, WORKER_GROUP,
is_native_mode_from_env, reload_custom_tags_setting, Connection, HUB_CACHE_DIR,
HUB_RT_CACHE_DIR, NATIVE_MODE_RESOLVED, TMP_DIR, TMP_LOGS_DIR, WORKER_GROUP,
},
KillpillSender, DEFAULT_HUB_BASE_URL, METRICS_ENABLED,
};
@@ -653,6 +653,9 @@ async fn windmill_main() -> anyhow::Result<()> {
#[allow(unused_mut)]
let mut num_workers = if mode == Mode::Server || mode == Mode::Indexer || mode == Mode::MCP {
0
} else if is_native_mode_from_env() {
println!("Native mode enabled: forcing NUM_WORKERS=8");
8
} else {
std::env::var("NUM_WORKERS")
.ok()
@@ -660,11 +663,21 @@ async fn windmill_main() -> anyhow::Result<()> {
.unwrap_or(DEFAULT_NUM_WORKERS as i32)
};
// TODO: maybe gate behind debug_assertions?
if num_workers > 1 && !std::env::var("WORKER_GROUP").is_ok_and(|x| x == "native") {
println!(
"We STRONGLY recommend using at most 1 worker per container, use at your own risks"
);
if num_workers > 1 && !is_native_mode_from_env() {
if std::env::var("I_ACK_NUM_WORKERS_IS_UNSAFE").is_ok_and(|x| x == "1" || x == "true") {
println!(
"WARNING: Running with NUM_WORKERS={} without native mode. \
This is not recommended. Use at your own risk.",
num_workers
);
} else {
eprintln!(
"WARNING: NUM_WORKERS={} > 1 is only safe for native workers. \
Falling back to NUM_WORKERS=1. Set NATIVE_MODE=true for native-only workers.",
num_workers
);
num_workers = 1;
}
}
let server_mode = !std::env::var("DISABLE_SERVER")
@@ -924,6 +937,16 @@ Windmill Community Edition {GIT_VERSION}
)
.await;
// native_mode may also be set via DB worker group config (not just env).
// NATIVE_MODE_RESOLVED is updated by load_worker_config during initial_load.
if worker_mode
&& !is_native_mode_from_env()
&& NATIVE_MODE_RESOLVED.load(std::sync::atomic::Ordering::Relaxed)
{
num_workers = 8;
tracing::info!("Native mode detected from worker config: forcing NUM_WORKERS=8");
}
monitor_db(
&conn,
&base_internal_url,

View File

@@ -239,11 +239,16 @@ pub async fn initial_load(
Connection::Http(_) => {
// TODO: reload worker config from http
let mut config = WORKER_CONFIG.write().await;
let worker_tags = DECODED_AGENT_TOKEN
.as_ref()
.map(|x| x.tags.clone())
.unwrap_or_default();
// we only check from env as native_mode is not stored in the token
let native_mode = windmill_common::worker::is_native_mode_from_env();
windmill_common::worker::NATIVE_MODE_RESOLVED
.store(native_mode, std::sync::atomic::Ordering::Relaxed);
*config = WorkerConfig {
worker_tags: DECODED_AGENT_TOKEN
.as_ref()
.map(|x| x.tags.clone())
.unwrap_or_default(),
worker_tags,
env_vars: load_env_vars(
load_whitelist_env_vars_from_env(),
&std::collections::HashMap::new(),
@@ -257,6 +262,7 @@ pub async fn initial_load(
cache_clear: None,
additional_python_paths: None,
pip_local_dependencies: None,
native_mode,
};
}
}
@@ -2242,6 +2248,11 @@ pub async fn reload_worker_config(db: &DB, tx: KillpillSender, kill_if_change: b
tracing::info!("Periodic script interval config changed, sending killpill. Expecting to be restarted by supervisor.");
let _ = tx.send();
}
if (*wc).native_mode != config.native_mode {
tracing::info!("Native mode config changed, sending killpill. Expecting to be restarted by supervisor.");
let _ = tx.send();
}
}
drop(wc);

View File

@@ -24,7 +24,7 @@ use windmill_common::{
DB,
};
use windmill_api_auth::{ApiAuthed, require_super_admin};
use windmill_api_auth::{require_super_admin, ApiAuthed};
pub fn global_service() -> Router {
Router::new()
@@ -76,6 +76,8 @@ struct WorkerPing {
wm_memory_usage: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
job_isolation: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
native_mode: Option<bool>,
}
// #[derive(Serialize, Deserialize)]
@@ -108,7 +110,7 @@ async fn list_worker_pings(
WorkerPing,
"SELECT worker, worker_instance, EXTRACT(EPOCH FROM (now() - ping_at))::integer as last_ping, started_at, ip, jobs_executed,
CASE WHEN $4 IS TRUE THEN current_job_id ELSE NULL END as last_job_id, CASE WHEN $4 IS TRUE THEN current_job_workspace_id ELSE NULL END as last_job_workspace_id,
custom_tags, worker_group, wm_version, occupancy_rate, occupancy_rate_15s, occupancy_rate_5m, occupancy_rate_30m, memory, vcpus, memory_usage, wm_memory_usage, job_isolation
custom_tags, worker_group, wm_version, occupancy_rate, occupancy_rate_15s, occupancy_rate_5m, occupancy_rate_30m, memory, vcpus, memory_usage, wm_memory_usage, job_isolation, native_mode
FROM worker_ping
WHERE ($1::integer IS NULL AND ping_at > now() - interval '5 minute') OR (ping_at > now() - ($1 || ' seconds')::interval)
ORDER BY ping_at desc LIMIT $2 OFFSET $3",

View File

@@ -22124,6 +22124,8 @@ components:
type: number
job_isolation:
type: string
native_mode:
type: boolean
required:
- worker
- worker_instance

View File

@@ -156,6 +156,8 @@ lazy_static::lazy_static! {
pub static ref NO_LOGS: bool = std::env::var("NO_LOGS").ok().is_some_and(|x| x == "1" || x == "true");
pub static ref NATIVE_MODE: bool = std::env::var("NATIVE_MODE").ok().is_some_and(|x| x == "1" || x == "true");
pub static ref CGROUP_V2_PATH_RE: Regex = Regex::new(r#"(?m)^0::(/.*)$"#).unwrap();
pub static ref CGROUP_V2_CPU_RE: Regex = Regex::new(r#"(?m)^(\d+) \S+$"#).unwrap();
pub static ref CGROUP_V1_INACTIVE_FILE_RE: Regex = Regex::new(r#"(?m)^total_inactive_file (\d+)$"#).unwrap();
@@ -227,6 +229,7 @@ lazy_static::lazy_static! {
additional_python_paths: Default::default(),
pip_local_dependencies: Default::default(),
env_vars: Default::default(),
native_mode: false,
}));
pub static ref WORKER_PULL_QUERIES: Arc<RwLock<Vec<String>>> = Arc::new(RwLock::new(vec![]));
@@ -273,6 +276,17 @@ lazy_static::lazy_static! {
pub const ROOT_CACHE_NOMOUNT_DIR: &str = concatcp!(TMP_DIR, "/cache_nomount/");
/// Whether native mode is forced by the environment (NATIVE_MODE=true env var or WORKER_GROUP=native).
/// This does NOT account for native_mode set in the DB worker group config — for that, read
/// `WORKER_CONFIG.native_mode` which combines all sources.
pub fn is_native_mode_from_env() -> bool {
*NATIVE_MODE || *WORKER_GROUP == "native"
}
/// Cached resolved native mode flag, updated when worker config is reloaded.
/// Use this for hot-path checks (e.g. per-job dispatch) to avoid read-locking WORKER_CONFIG.
pub static NATIVE_MODE_RESOLVED: AtomicBool = AtomicBool::new(false);
pub static MIN_VERSION_IS_LATEST: AtomicBool = AtomicBool::new(false);
#[derive(Clone)]
pub struct HttpClient {
@@ -1248,6 +1262,7 @@ pub struct Ping {
pub occupancy_rate_5m: Option<f32>,
pub occupancy_rate_30m: Option<f32>,
pub job_isolation: Option<String>,
pub native_mode: Option<bool>,
pub ping_type: PingType,
}
pub async fn update_ping_http(
@@ -1271,6 +1286,7 @@ pub async fn update_ping_http(
insert_ping.occupancy_rate_15s,
insert_ping.occupancy_rate_5m,
insert_ping.occupancy_rate_30m,
insert_ping.native_mode.unwrap_or(false),
db,
)
.await?
@@ -1297,6 +1313,7 @@ pub async fn update_ping_http(
insert_ping.vcpus,
insert_ping.memory,
insert_ping.job_isolation,
insert_ping.native_mode.unwrap_or(false),
db,
)
.await?;
@@ -1428,11 +1445,12 @@ pub async fn insert_ping_query(
vcpus: Option<i64>,
memory: Option<i64>,
job_isolation: Option<String>,
native_mode: bool,
db: &DB,
) -> anyhow::Result<()> {
sqlx::query!(
"INSERT INTO worker_ping (worker_instance, worker, ip, custom_tags, worker_group, dedicated_worker, dedicated_workers, wm_version, vcpus, memory, job_isolation) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT (worker)
DO UPDATE set ip = EXCLUDED.ip, custom_tags = EXCLUDED.custom_tags, worker_group = EXCLUDED.worker_group, dedicated_workers = EXCLUDED.dedicated_workers",
"INSERT INTO worker_ping (worker_instance, worker, ip, custom_tags, worker_group, dedicated_worker, dedicated_workers, wm_version, vcpus, memory, job_isolation, native_mode) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) ON CONFLICT (worker)
DO UPDATE set ip = EXCLUDED.ip, custom_tags = EXCLUDED.custom_tags, worker_group = EXCLUDED.worker_group, dedicated_workers = EXCLUDED.dedicated_workers, native_mode = EXCLUDED.native_mode",
worker_instance,
worker_name,
ip,
@@ -1443,7 +1461,8 @@ pub async fn insert_ping_query(
version,
vcpus,
memory,
job_isolation.as_deref()
job_isolation.as_deref(),
native_mode,
)
.execute(db)
.await?;
@@ -1534,12 +1553,13 @@ pub async fn update_worker_ping_main_loop_query(
occupancy_rate_15s: Option<f32>,
occupancy_rate_5m: Option<f32>,
occupancy_rate_30m: Option<f32>,
native_mode: bool,
db: &DB,
) -> anyhow::Result<()> {
timeout(Duration::from_secs(10), sqlx::query!(
"UPDATE worker_ping SET ping_at = now(), jobs_executed = $1, custom_tags = $2,
occupancy_rate = $3, memory_usage = $4, wm_memory_usage = $5, vcpus = COALESCE($7, vcpus),
memory = COALESCE($8, memory), occupancy_rate_15s = $9, occupancy_rate_5m = $10, occupancy_rate_30m = $11 WHERE worker = $6",
memory = COALESCE($8, memory), occupancy_rate_15s = $9, occupancy_rate_5m = $10, occupancy_rate_30m = $11, native_mode = $12 WHERE worker = $6",
jobs_executed,
tags,
occupancy_rate,
@@ -1551,6 +1571,7 @@ pub async fn update_worker_ping_main_loop_query(
occupancy_rate_15s,
occupancy_rate_5m,
occupancy_rate_30m,
native_mode,
)
.execute(db))
.await??;
@@ -1789,6 +1810,9 @@ pub async fn load_worker_config(
}
}
let native_mode = is_native_mode_from_env() || config.native_mode.unwrap_or(false);
NATIVE_MODE_RESOLVED.store(native_mode, std::sync::atomic::Ordering::Relaxed);
Ok(WorkerConfig {
worker_tags,
priority_tags_sorted,
@@ -1808,6 +1832,7 @@ pub async fn load_worker_config(
.additional_python_paths
.or_else(|| load_additional_python_paths_from_env()),
env_vars: resolved_env_vars,
native_mode,
})
}
@@ -1896,6 +1921,7 @@ pub struct WorkerConfigOpt {
pub pip_local_dependencies: Option<Vec<String>>,
pub env_vars_static: Option<HashMap<String, String>>,
pub env_vars_allowlist: Option<Vec<String>>,
pub native_mode: Option<bool>,
}
impl Default for WorkerConfigOpt {
@@ -1913,6 +1939,7 @@ impl Default for WorkerConfigOpt {
pip_local_dependencies: Default::default(),
env_vars_static: Default::default(),
env_vars_allowlist: Default::default(),
native_mode: Default::default(),
}
}
}
@@ -1930,12 +1957,13 @@ pub struct WorkerConfig {
pub additional_python_paths: Option<Vec<String>>,
pub pip_local_dependencies: Option<Vec<String>>,
pub env_vars: HashMap<String, String>,
pub native_mode: bool,
}
impl std::fmt::Debug for WorkerConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "WorkerConfig {{ worker_tags: {:?}, priority_tags_sorted: {:?}, dedicated_worker: {:?}, dedicated_workers: {:?}, init_bash: {:?}, periodic_script_bash: {:?}, periodic_script_interval_seconds: {:?}, cache_clear: {:?}, additional_python_paths: {:?}, pip_local_dependencies: {:?}, env_vars: {:?} }}",
self.worker_tags, self.priority_tags_sorted, self.dedicated_worker, self.dedicated_workers, self.init_bash, self.periodic_script_bash, self.periodic_script_interval_seconds, self.cache_clear, self.additional_python_paths, self.pip_local_dependencies, self.env_vars.iter().map(|(k, v)| format!("{}: {}{} ({} chars)", k, &v[..3.min(v.len())], "***", v.len())).collect::<Vec<String>>().join(", "))
write!(f, "WorkerConfig {{ worker_tags: {:?}, priority_tags_sorted: {:?}, dedicated_worker: {:?}, dedicated_workers: {:?}, init_bash: {:?}, periodic_script_bash: {:?}, periodic_script_interval_seconds: {:?}, cache_clear: {:?}, additional_python_paths: {:?}, pip_local_dependencies: {:?}, env_vars: {:?}, native_mode: {:?} }}",
self.worker_tags, self.priority_tags_sorted, self.dedicated_worker, self.dedicated_workers, self.init_bash, self.periodic_script_bash, self.periodic_script_interval_seconds, self.cache_clear, self.additional_python_paths, self.pip_local_dependencies, self.env_vars.iter().map(|(k, v)| format!("{}: {}{} ({} chars)", k, &v[..3.min(v.len())], "***", v.len())).collect::<Vec<String>>().join(", "), self.native_mode)
}
}

View File

@@ -102,6 +102,21 @@ impl ScriptLang {
)
}
pub fn is_native(&self) -> bool {
matches!(
self,
ScriptLang::Bunnative |
ScriptLang::Nativets |
ScriptLang::Postgresql |
ScriptLang::Mysql |
ScriptLang::Graphql |
ScriptLang::Snowflake |
ScriptLang::Mssql |
ScriptLang::Bigquery |
ScriptLang::OracleDB
)
}
pub fn as_comment_lit(&self) -> String {
use ScriptLang::*;
match self {
@@ -142,9 +157,7 @@ impl FromStr for ScriptLang {
"java" => ScriptLang::Java,
"ruby" => ScriptLang::Ruby,
// for related places search: ADD_NEW_LANG
language => {
return Err(anyhow::anyhow!("{} is currently not supported", language))
}
language => return Err(anyhow::anyhow!("{} is currently not supported", language)),
};
Ok(language)
@@ -581,9 +594,7 @@ where
pub fn to_i64(s: &str) -> anyhow::Result<i64> {
let v = hex::decode(s)?;
if v.len() < 8 {
return Err(anyhow::anyhow!(
"hex string did not decode to an u64: {s}",
));
return Err(anyhow::anyhow!("hex string did not decode to an u64: {s}",));
}
let nb: u64 = u64::from_be_bytes(
v[0..8]

View File

@@ -562,6 +562,7 @@ pub async fn update_worker_ping_for_failed_init_script(
memory_usage: None,
wm_memory_usage: None,
job_isolation: None,
native_mode: None,
ping_type: PingType::InitScript,
},
)

View File

@@ -80,7 +80,7 @@ use windmill_common::{
scripts::{get_full_hub_script_by_path, ScriptHash, ScriptLang},
tracing_init::{QUIET_MODE, VERBOSE_TARGET},
utils::StripPath,
worker::{CLOUD_HOSTED, NO_LOGS, WORKER_CONFIG, WORKER_GROUP},
worker::{CLOUD_HOSTED, NATIVE_MODE_RESOLVED, NO_LOGS, WORKER_CONFIG, WORKER_GROUP},
DB, IS_READY,
};
@@ -3010,6 +3010,17 @@ pub async fn handle_queued_job(
_ => {}
}
if NATIVE_MODE_RESOLVED.load(std::sync::atomic::Ordering::Relaxed) {
if let Some(lang) = &job.script_lang {
if !lang.is_native() {
return Err(Error::ExecutionErr(format!(
"Worker is in native mode and cannot execute non-native job with language '{}'",
lang.as_str(),
)));
}
}
}
#[cfg(any(not(feature = "enterprise"), feature = "sqlx"))]
match conn {
Connection::Sql(db) => {

View File

@@ -7,8 +7,8 @@ use windmill_common::{
worker::{
get_memory, get_vcpus, get_windmill_memory_usage, get_worker_memory_usage,
insert_ping_query, update_job_ping_query, update_worker_ping_from_job_query,
update_worker_ping_main_loop_query, Connection, Ping, PingType, WORKER_CONFIG,
WORKER_GROUP,
update_worker_ping_main_loop_query, Connection, Ping, PingType, NATIVE_MODE_RESOLVED,
WORKER_CONFIG, WORKER_GROUP,
},
KillpillSender, DB,
};
@@ -27,7 +27,10 @@ pub(crate) async fn update_worker_ping_full(
occupancy_metrics: &mut OccupancyMetrics,
killpill_tx: &KillpillSender,
) {
let tags = WORKER_CONFIG.read().await.worker_tags.clone();
let wc = WORKER_CONFIG.read().await;
let tags = wc.worker_tags.clone();
let native_mode = wc.native_mode;
drop(wc);
let memory_usage = get_worker_memory_usage();
let wm_memory_usage = get_windmill_memory_usage();
@@ -60,6 +63,7 @@ pub(crate) async fn update_worker_ping_full(
occupancy_rate_15s,
occupancy_rate_5m,
occupancy_rate_30m,
native_mode,
)
})
.retry(
@@ -105,6 +109,7 @@ async fn update_worker_ping_full_inner(
occupancy_rate_15s: Option<f32>,
occupancy_rate_5m: Option<f32>,
occupancy_rate_30m: Option<f32>,
native_mode: bool,
) -> anyhow::Result<()> {
match conn {
Connection::Sql(db) => {
@@ -120,6 +125,7 @@ async fn update_worker_ping_full_inner(
occupancy_rate_15s,
occupancy_rate_5m,
occupancy_rate_30m,
native_mode,
db,
)
.await?;
@@ -148,6 +154,7 @@ async fn update_worker_ping_full_inner(
memory_usage: get_worker_memory_usage(),
wm_memory_usage: get_windmill_memory_usage(),
job_isolation: None,
native_mode: Some(native_mode),
ping_type: PingType::MainLoop,
},
)
@@ -163,7 +170,7 @@ pub async fn insert_ping(
ip: &str,
db: &Connection,
) -> anyhow::Result<()> {
let (tags, dw, dws) = {
let (tags, dw, dws, native_mode) = {
let wc = WORKER_CONFIG.read().await.clone();
(
wc.worker_tags,
@@ -176,6 +183,7 @@ pub async fn insert_ping(
.map(|x| format!("{}:{}", x.workspace_id, x.path))
.collect::<Vec<_>>()
}),
wc.native_mode,
)
};
@@ -204,6 +212,7 @@ pub async fn insert_ping(
vcpus,
memory,
job_isolation,
native_mode,
db,
)
.await?;
@@ -232,6 +241,7 @@ pub async fn insert_ping(
memory_usage: get_worker_memory_usage(),
wm_memory_usage: get_windmill_memory_usage(),
job_isolation,
native_mode: Some(native_mode),
ping_type: PingType::Initial,
},
)
@@ -305,6 +315,9 @@ pub async fn update_worker_ping_from_job(
occupancy_rate_5m: occupancy_rate_5m,
occupancy_rate_30m: occupancy_rate_30m,
job_isolation,
native_mode: Some(
NATIVE_MODE_RESOLVED.load(std::sync::atomic::Ordering::Relaxed),
),
},
)
.await?;

View File

@@ -103,6 +103,7 @@ services:
- DATABASE_URL=${DATABASE_URL}
- MODE=worker
- WORKER_GROUP=native
- NATIVE_MODE=true
- NUM_WORKERS=8
- SLEEP_QUEUE=200
depends_on:

View File

@@ -32,7 +32,12 @@
import Section from './Section.svelte'
import Label from './Label.svelte'
import Toggle from './Toggle.svelte'
import { cleanWorkerGroupConfig, defaultTags, nativeTags, type AutoscalingConfig } from './worker_group'
import {
cleanWorkerGroupConfig,
defaultTags,
nativeTags,
type AutoscalingConfig
} from './worker_group'
import AutoscalingConfigEditor from './AutoscalingConfigEditor.svelte'
import TagsToListenTo from './TagsToListenTo.svelte'
import Select from './select/Select.svelte'
@@ -87,6 +92,7 @@
pip_local_dependencies?: string[]
min_alive_workers_alert_threshold?: number
autoscaling?: AutoscalingConfig
native_mode?: boolean
} = $state({})
function loadNConfig() {
@@ -191,6 +197,7 @@
autoscaling?: AutoscalingConfig
periodic_script_bash?: string
periodic_script_interval_seconds?: number
native_mode?: boolean
}
activeWorkers: number
customTags: string[] | undefined
@@ -257,7 +264,8 @@
// Compute hashed tags for display (actual tags used by the worker)
let hashedDedicatedTags: Map<string, string> = $state(new Map())
$effect(() => {
const dws = config?.dedicated_workers ?? (config?.dedicated_worker ? [config.dedicated_worker] : [])
const dws =
config?.dedicated_workers ?? (config?.dedicated_worker ? [config.dedicated_worker] : [])
if (dws.length > 0) {
Promise.all(dws.map(async (dw) => [dw, await computeHashedTag(dw)] as const)).then(
(entries) => {
@@ -276,6 +284,14 @@
? 'dedicated'
: 'normal'
)
let isNativeMode = $derived(
config?.native_mode === true ||
name === 'native' ||
(workers.length > 0 &&
workers.some(([_, pings]) => pings.some((p) => p.native_mode === true)))
)
let nonNativeTags = $derived((nconfig?.worker_tags ?? []).filter((t) => !nativeTags.includes(t)))
let isAutoNativeMode = $derived(name === 'native')
$effect(() => {
;($superadmin || $devopsRole) && listWorkspaces()
})
@@ -497,7 +513,11 @@
{#if nconfig !== undefined}
<div class="mt-8"></div>
<Label label="Alerts" tooltip="Alert is sent to the configured critical error channels" eeOnly={!hasEnterpriseFeatures}>
<Label
label="Alerts"
tooltip="Alert is sent to the configured critical error channels"
eeOnly={!hasEnterpriseFeatures}
>
<Toggle
size="sm"
options={{
@@ -525,6 +545,43 @@
{/if}
</Label>
{/if}
{#if nconfig !== undefined}
<div class="mt-8"></div>
<Label label="Native mode">
{#snippet header()}
<Tooltip>
{#snippet text()}
When enabled, the worker will only accept native jobs (nativets, postgresql, mysql,
etc.) and automatically runs with NUM_WORKERS=8 for optimal throughput. Non-native
jobs will be failed.
{/snippet}
</Tooltip>
{/snippet}
<Toggle
size="sm"
options={{
right: isAutoNativeMode
? 'Native mode (automatically enabled)'
: 'Enable native mode'
}}
checked={nconfig?.native_mode === true || isAutoNativeMode}
on:change={(ev) => {
if (nconfig !== undefined) {
nconfig.native_mode = ev.detail ? true : undefined
}
}}
disabled={!canEditConfig || isAutoNativeMode}
/>
{#if (nconfig.native_mode || isAutoNativeMode) && nonNativeTags.length > 0}
<Alert size="xs" type="warning" title="Non-native tags detected">
This worker group has native mode enabled but includes non-native tags: {nonNativeTags.join(
', '
)}. Non-native jobs will be failed. This is fine if those custom tags are only used for native language jobs.
</Alert>
{/if}
</Label>
{/if}
{:else if selected == 'dedicated'}
<div class="flex flex-col gap-4">
{#if $superadmin || $devopsRole}
@@ -704,7 +761,12 @@
/>
<div class="mt-8"></div>
<Section label="Python dependencies overrides" collapsable={true} class="flex flex-col gap-y-6" eeOnly={!hasEnterpriseFeatures}>
<Section
label="Python dependencies overrides"
collapsable={true}
class="flex flex-col gap-y-6"
eeOnly={!hasEnterpriseFeatures}
>
<Label
label="Additional Python paths"
tooltip="Paths to add to the Python path for it to search dependencies, useful if you have packages pre-installed on the workers at a given path."
@@ -883,8 +945,8 @@
{#if (nconfig.periodic_script_bash ?? '') !== (config?.periodic_script_bash ?? '') || (nconfig.periodic_script_interval_seconds ?? 0) !== (config?.periodic_script_interval_seconds ?? 0)}
<div class="mb-2">
<Alert size="xs" type="info" title="Worker restart required">
Workers will get killed upon detecting changes to the periodic script or interval.
It is assumed they are in an environment where the supervisor will restart them.
Workers will get killed upon detecting changes to the periodic script or interval. It is
assumed they are in an environment where the supervisor will restart them.
</Alert>
</div>
{/if}
@@ -931,10 +993,13 @@
<div class="mt-8">
<Section label="Set via API" collapsable headerClass="text-secondary">
<p class="text-xs text-tertiary">Requires a superadmin token.</p>
<pre class="mt-1 p-2 bg-surface-secondary rounded text-xs overflow-x-auto whitespace-pre-wrap break-all">curl -X POST '{window.location.origin}/api/configs/update/worker__{name}' \
<pre
class="mt-1 p-2 bg-surface-secondary rounded text-xs overflow-x-auto whitespace-pre-wrap break-all"
>curl -X POST '{window.location.origin}/api/configs/update/worker__{name}' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer &lt;token&gt;' \
-d '{JSON.stringify(cleanWorkerGroupConfig(nconfig ?? {}), null, 2)}'</pre>
-d '{JSON.stringify(cleanWorkerGroupConfig(nconfig ?? {}), null, 2)}'</pre
>
</Section>
</div>
{#snippet actions()}
@@ -1039,6 +1104,9 @@
>{#snippet text()}Number of active workers of this group in the last 15 seconds{/snippet}</Tooltip
></span
>
{#if isNativeMode}
<Badge color="blue" small>Native</Badge>
{/if}
</div>
{#if vcpus_memory?.vcpus}