Compare commits

...

1 Commits

Author SHA1 Message Date
Ruben Fiszel
9007072274 all 2025-03-04 15:24:10 +01:00
9 changed files with 150 additions and 113 deletions

View File

@@ -59,7 +59,7 @@ flow_testing = ["windmill-worker/flow_testing"]
openidconnect = ["windmill-api/openidconnect"]
cloud = ["windmill-queue/cloud", "windmill-worker/cloud"]
jemalloc = ["windmill-common/jemalloc", "dep:tikv-jemallocator", "dep:tikv-jemalloc-sys", "dep:tikv-jemalloc-ctl"]
tantivy = ["dep:windmill-indexer", "windmill-api/tantivy", "windmill-indexer/enterprise", "windmill-indexer/parquet", "enterprise", "parquet"]
tantivy = ["dep:windmill-indexer", "windmill-api/tantivy", "windmill-indexer/enterprise", "windmill-indexer/parquet", "windmill-common/tantivy", "enterprise", "parquet"]
sqlx = ["windmill-worker/sqlx"]
deno_core = ["windmill-worker/deno_core", "dep:deno_core", "dep:v8"]
kafka = ["windmill-api/kafka"]

View File

@@ -44,7 +44,7 @@ use windmill_common::{
},
scripts::ScriptLang,
stats_ee::schedule_stats,
utils::{hostname, rd_string, Mode, GIT_VERSION},
utils::{hostname, rd_string, Mode, GIT_VERSION, MODE_AND_ADDONS},
worker::{reload_custom_tags_setting, HUB_CACHE_DIR, TMP_DIR, TMP_LOGS_DIR, WORKER_GROUP},
DB, METRICS_ENABLED,
};
@@ -238,63 +238,12 @@ async fn windmill_main() -> anyhow::Result<()> {
let hostname = hostname();
let mut enable_standalone_indexer: bool = false;
let mode_and_addons = MODE_AND_ADDONS.clone();
let mode = mode_and_addons.mode;
let mode = std::env::var("MODE")
.map(|x| x.to_lowercase())
.map(|x| {
if &x == "server" {
println!("Binary is in 'server' mode");
Mode::Server
} else if &x == "worker" {
tracing::info!("Binary is in 'worker' mode");
#[cfg(windows)]
{
println!("It is highly recommended to use the agent mode instead on windows (MODE=agent) and to pass a BASE_INTERNAL_URL");
}
Mode::Worker
} else if &x == "agent" {
println!("Binary is in 'agent' mode");
if std::env::var("BASE_INTERNAL_URL").is_err() {
panic!("BASE_INTERNAL_URL is required in agent mode")
}
if std::env::var("JOB_TOKEN").is_err() {
println!("JOB_TOKEN is not passed, hence workers will still need to create permissions for each job and the DATABASE_URL needs to be of a role that can INSERT into the job_perms table")
}
#[cfg(not(feature = "enterprise"))]
{
panic!("Agent mode is only available in the EE, ignoring...");
}
#[cfg(feature = "enterprise")]
Mode::Agent
} else if &x == "indexer" {
tracing::info!("Binary is in 'indexer' mode");
#[cfg(not(feature = "tantivy"))]
{
eprintln!("Cannot start the indexer because tantivy is not included in this binary/image. Make sure you are using the EE image if you want to access the full text search features.");
panic!("Indexer mode requires compiling with the tantivy feature flag.");
}
#[cfg(feature = "tantivy")]
Mode::Indexer
} else if &x == "standalone+search"{
enable_standalone_indexer = true;
println!("Binary is in 'standalone' mode with search enabled");
Mode::Standalone
}
else {
if &x != "standalone" {
eprintln!("mode not recognized, defaulting to standalone: {x}");
} else {
println!("Binary is in 'standalone' mode");
}
Mode::Standalone
}
})
.unwrap_or_else(|_| {
tracing::info!("Mode not specified, defaulting to standalone");
Mode::Standalone
});
if mode == Mode::Standalone {
println!("Running in standalone mode");
}
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
println!("jemalloc enabled");
@@ -537,8 +486,7 @@ Windmill Community Edition {GIT_VERSION}
.expect("could not create initial server dir");
#[cfg(feature = "tantivy")]
let should_index_jobs =
mode == Mode::Indexer || (enable_standalone_indexer && mode == Mode::Standalone);
let should_index_jobs = mode == Mode::Indexer || mode_and_addons.indexer;
reload_indexer_config(&db).await;

View File

@@ -18,7 +18,6 @@ use crate::{
webhook_util::{WebhookMessage, WebhookShared},
HTTP_CLIENT,
};
#[cfg(all(feature = "enterprise", feature = "parquet"))]
use axum::extract::Multipart;
use axum::{
@@ -42,7 +41,6 @@ use std::{
use windmill_audit::audit_ee::audit_log;
use windmill_audit::ActionKind;
#[cfg(all(feature = "enterprise", feature = "parquet"))]
use windmill_common::error::to_anyhow;
use windmill_common::{
@@ -358,12 +356,6 @@ fn hash_script(ns: &NewScript) -> i64 {
dh.finish() as i64
}
#[cfg(not(all(feature = "enterprise", feature = "parquet")))]
async fn create_snapshot_script() -> Result<(StatusCode, String)> {
Err(Error::BadRequest("Upgrade to EE to use bundle".to_string()))
}
#[cfg(all(feature = "enterprise", feature = "parquet"))]
async fn create_snapshot_script(
authed: ApiAuthed,
Extension(user_db): Extension<UserDB>,
@@ -404,12 +396,35 @@ async fn create_snapshot_script(
})?;
uploaded = true;
let path = windmill_common::s3_helpers::bundle(&w_id, &hash);
if &windmill_common::utils::MODE_AND_ADDONS.mode
== &windmill_common::utils::Mode::Standalone
{
std::fs::create_dir_all(format!(
"{}/script_bundle/{}",
windmill_common::worker::ROOT_CACHE_NOMOUNT_DIR,
w_id
))?;
windmill_common::worker::write_file(
windmill_common::worker::ROOT_CACHE_NOMOUNT_DIR,
&path,
&String::from_utf8_lossy(&data),
)?;
return Ok((StatusCode::CREATED, format!("{}", script_hash.unwrap())));
}
#[cfg(not(all(feature = "enterprise", feature = "parquet")))]
{
return Err(Error::ExecutionErr("codebase is an EE feature".to_string()));
}
#[cfg(all(feature = "enterprise", feature = "parquet"))]
if let Some(os) = windmill_common::s3_helpers::OBJECT_STORE_CACHE_SETTINGS
.read()
.await
.clone()
{
let path = windmill_common::s3_helpers::bundle(&w_id, &hash);
if let Err(e) = os
.put(&object_store::path::Path::from(path.clone()), data.into())
.await

View File

@@ -8,6 +8,7 @@ edition.workspace = true
default = []
enterprise = []
jemalloc = ["dep:tikv-jemalloc-ctl"]
tantivy = []
prometheus = ["dep:prometheus"]
loki = ["dep:tracing-loki"]
benchmark = []

View File

@@ -53,6 +53,76 @@ lazy_static::lazy_static! {
GIT_VERSION
}
).unwrap_or(Version::new(0, 1, 0));
pub static ref MODE_AND_ADDONS: ModeAndAddons = {
let mut search_addon = false;
let mode = std::env::var("MODE")
.map(|x| x.to_lowercase())
.map(|x| {
if &x == "server" {
println!("Binary is in 'server' mode");
Mode::Server
} else if &x == "worker" {
tracing::info!("Binary is in 'worker' mode");
#[cfg(windows)]
{
println!("It is highly recommended to use the agent mode instead on windows (MODE=agent) and to pass a BASE_INTERNAL_URL");
}
Mode::Worker
} else if &x == "agent" {
println!("Binary is in 'agent' mode");
if std::env::var("BASE_INTERNAL_URL").is_err() {
panic!("BASE_INTERNAL_URL is required in agent mode")
}
if std::env::var("JOB_TOKEN").is_err() {
println!("JOB_TOKEN is not passed, hence workers will still need to create permissions for each job and the DATABASE_URL needs to be of a role that can INSERT into the job_perms table")
}
#[cfg(not(feature = "enterprise"))]
{
panic!("Agent mode is only available in the EE, ignoring...");
}
#[cfg(feature = "enterprise")]
Mode::Agent
} else if &x == "indexer" {
tracing::info!("Binary is in 'indexer' mode");
#[cfg(not(feature = "tantivy"))]
{
eprintln!("Cannot start the indexer because tantivy is not included in this binary/image. Make sure you are using the EE image if you want to access the full text search features.");
panic!("Indexer mode requires compiling with the tantivy feature flag.");
}
#[cfg(feature = "tantivy")]
Mode::Indexer
} else if &x == "standalone+search"{
search_addon = true;
println!("Binary is in 'standalone' mode with search enabled");
Mode::Standalone
}
else {
if &x != "standalone" {
eprintln!("mode not recognized, defaulting to standalone: {x}");
} else {
println!("Binary is in 'standalone' mode");
}
Mode::Standalone
}
})
.unwrap_or_else(|_| {
tracing::info!("Mode not specified, defaulting to standalone");
Mode::Standalone
});
ModeAndAddons {
indexer: search_addon,
mode,
}
};
}
#[derive(Clone)]
pub struct ModeAndAddons {
pub indexer: bool,
pub mode: Mode,
}
#[derive(Deserialize, Clone)]

View File

@@ -104,6 +104,7 @@ lazy_static::lazy_static! {
pub static ref DISABLE_FLOW_SCRIPT: bool = std::env::var("DISABLE_FLOW_SCRIPT").ok().is_some_and(|x| x == "1" || x == "true");
}
pub const ROOT_CACHE_NOMOUNT_DIR: &str = concatcp!(TMP_DIR, "/cache_nomount/");
pub static MIN_VERSION_IS_LATEST: AtomicBool = AtomicBool::new(false);
fn format_pull_query(peek: String) -> String {

View File

@@ -579,20 +579,22 @@ pub async fn generate_bun_bundle(
Ok(())
}
#[cfg(all(feature = "enterprise", feature = "parquet"))]
pub async fn pull_codebase(w_id: &str, id: &str, job_dir: &str) -> Result<()> {
use crate::global_cache::extract_tar;
let path = windmill_common::s3_helpers::bundle(&w_id, &id);
let bun_cache_path = format!("{}/{}", crate::ROOT_CACHE_NOMOUNT_DIR, path);
let bun_cache_path = format!(
"{}/{}",
windmill_common::worker::ROOT_CACHE_NOMOUNT_DIR,
path
);
let is_tar = id.ends_with(".tar");
let dst = format!(
"{job_dir}/{}",
if is_tar { "codebase.tar" } else { "main.js" }
);
let dirs_splitted = bun_cache_path.split("/").collect_vec();
tokio::fs::create_dir_all(dirs_splitted[..dirs_splitted.len() - 1].join("/")).await?;
if tokio::fs::metadata(&bun_cache_path).await.is_ok() {
tracing::info!("loading {bun_cache_path} from cache");
if is_tar {
@@ -604,35 +606,47 @@ pub async fn pull_codebase(w_id: &str, id: &str, job_dir: &str) -> Result<()> {
#[cfg(windows)]
std::os::windows::fs::symlink_dir(&bun_cache_path, &dst)?;
}
} else if let Some(os) = windmill_common::s3_helpers::OBJECT_STORE_CACHE_SETTINGS
.read()
.await
.clone()
{
let bytes = attempt_fetch_bytes(os, &path).await?;
tokio::fs::write(&bun_cache_path, &bytes).await?;
if is_tar {
extract_tar(bytes, job_dir).await?;
} else {
#[cfg(unix)]
tokio::fs::symlink(bun_cache_path, dst).await?;
#[cfg(windows)]
std::os::windows::fs::symlink_dir(&bun_cache_path, &dst)?;
} else {
#[cfg(not(all(feature = "enterprise", feature = "parquet")))]
{
if &windmill_common::utils::MODE_AND_ADDONS.mode
== &windmill_common::utils::Mode::Standalone
{
return Err(error::Error::ExecutionErr(format!(
"(standalone bundle test mode) could not find codebase at {bun_cache_path}"
)));
} else {
return Err(error::Error::ExecutionErr(
"codebase is an EE feature".to_string(),
));
}
}
// extract_tar(bytes, job_dir).await?;
#[cfg(all(feature = "enterprise", feature = "parquet"))]
if let Some(os) = windmill_common::s3_helpers::OBJECT_STORE_CACHE_SETTINGS
.read()
.await
.clone()
{
let dirs_splitted = bun_cache_path.split("/").collect_vec();
tokio::fs::create_dir_all(dirs_splitted[..dirs_splitted.len() - 1].join("/")).await?;
let bytes = attempt_fetch_bytes(os, &path).await?;
tokio::fs::write(&bun_cache_path, &bytes).await?;
if is_tar {
extract_tar(bytes, job_dir).await?;
} else {
#[cfg(unix)]
tokio::fs::symlink(bun_cache_path, dst).await?;
#[cfg(windows)]
std::os::windows::fs::symlink_dir(&bun_cache_path, &dst)?;
}
}
}
return Ok(());
}
#[cfg(not(all(feature = "enterprise", feature = "parquet")))]
pub async fn pull_codebase(_w_id: &str, _id: &str, _job_dir: &str) -> Result<()> {
return Err(error::Error::ExecutionErr(
"codebase is an EE feature".to_string(),
));
Ok(())
}
pub async fn prebundle_bun_script(
@@ -837,13 +851,6 @@ pub async fn handle_bun_job(
let main_override = job.script_entrypoint_override.as_deref();
let apply_preprocessor = !job.is_flow_step && job.preprocessed == Some(false);
#[cfg(not(feature = "enterprise"))]
if annotation.nodejs || annotation.npm {
return Err(error::Error::ExecutionErr(
"Nodejs / npm mode is an EE feature".to_string(),
));
}
if has_bundle_cache {
let target;
let symlink;

View File

@@ -1,15 +1,12 @@
// #[cfg(feature = "enterprise")]
// use rand::Rng;
#[cfg(all(feature = "enterprise", feature = "parquet"))]
use tokio::time::Instant;
use windmill_common::error;
#[cfg(all(feature = "enterprise", feature = "parquet", unix))]
use object_store::ObjectStore;
#[cfg(all(feature = "enterprise", feature = "parquet"))]
use windmill_common::error;
#[cfg(all(feature = "enterprise", feature = "parquet", unix))]
use std::sync::Arc;
@@ -109,7 +106,6 @@ pub async fn pull_from_tar(
Ok(())
}
#[cfg(all(feature = "enterprise", feature = "parquet"))]
pub async fn extract_tar(tar: bytes::Bytes, folder: &str) -> error::Result<()> {
use bytes::Buf;
use tokio::fs::{self};

View File

@@ -18,7 +18,7 @@ use windmill_common::{
utils::WarnAfterExt,
worker::{
get_memory, get_vcpus, get_windmill_memory_usage, get_worker_memory_usage, write_file,
ROOT_CACHE_DIR, TMP_DIR,
ROOT_CACHE_DIR, ROOT_CACHE_NOMOUNT_DIR, TMP_DIR,
},
};
@@ -250,8 +250,6 @@ pub async fn create_token_for_owner(
Ok(format!("jwt_{}", token))
}
pub const ROOT_CACHE_NOMOUNT_DIR: &str = concatcp!(TMP_DIR, "/cache_nomount/");
pub const PY310_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "python_310");
pub const PY311_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "python_311");
pub const PY312_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "python_312");
@@ -274,6 +272,7 @@ pub const RUST_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "rust");
pub const CSHARP_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "csharp");
pub const BUN_CACHE_DIR: &str = concatcp!(ROOT_CACHE_NOMOUNT_DIR, "bun");
pub const BUN_BUNDLE_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "bun");
pub const BUN_CODEBASE_BUNDLE_CACHE_DIR: &str = concatcp!(ROOT_CACHE_NOMOUNT_DIR, "script_bundle");
pub const GO_BIN_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "gobin");
pub const POWERSHELL_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "powershell");