Compare commits
1 Commits
rf/warnRaw
...
rf/bundle1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9007072274 |
@@ -59,7 +59,7 @@ flow_testing = ["windmill-worker/flow_testing"]
|
||||
openidconnect = ["windmill-api/openidconnect"]
|
||||
cloud = ["windmill-queue/cloud", "windmill-worker/cloud"]
|
||||
jemalloc = ["windmill-common/jemalloc", "dep:tikv-jemallocator", "dep:tikv-jemalloc-sys", "dep:tikv-jemalloc-ctl"]
|
||||
tantivy = ["dep:windmill-indexer", "windmill-api/tantivy", "windmill-indexer/enterprise", "windmill-indexer/parquet", "enterprise", "parquet"]
|
||||
tantivy = ["dep:windmill-indexer", "windmill-api/tantivy", "windmill-indexer/enterprise", "windmill-indexer/parquet", "windmill-common/tantivy", "enterprise", "parquet"]
|
||||
sqlx = ["windmill-worker/sqlx"]
|
||||
deno_core = ["windmill-worker/deno_core", "dep:deno_core", "dep:v8"]
|
||||
kafka = ["windmill-api/kafka"]
|
||||
|
||||
@@ -44,7 +44,7 @@ use windmill_common::{
|
||||
},
|
||||
scripts::ScriptLang,
|
||||
stats_ee::schedule_stats,
|
||||
utils::{hostname, rd_string, Mode, GIT_VERSION},
|
||||
utils::{hostname, rd_string, Mode, GIT_VERSION, MODE_AND_ADDONS},
|
||||
worker::{reload_custom_tags_setting, HUB_CACHE_DIR, TMP_DIR, TMP_LOGS_DIR, WORKER_GROUP},
|
||||
DB, METRICS_ENABLED,
|
||||
};
|
||||
@@ -238,63 +238,12 @@ async fn windmill_main() -> anyhow::Result<()> {
|
||||
|
||||
let hostname = hostname();
|
||||
|
||||
let mut enable_standalone_indexer: bool = false;
|
||||
let mode_and_addons = MODE_AND_ADDONS.clone();
|
||||
let mode = mode_and_addons.mode;
|
||||
|
||||
let mode = std::env::var("MODE")
|
||||
.map(|x| x.to_lowercase())
|
||||
.map(|x| {
|
||||
if &x == "server" {
|
||||
println!("Binary is in 'server' mode");
|
||||
Mode::Server
|
||||
} else if &x == "worker" {
|
||||
tracing::info!("Binary is in 'worker' mode");
|
||||
#[cfg(windows)]
|
||||
{
|
||||
println!("It is highly recommended to use the agent mode instead on windows (MODE=agent) and to pass a BASE_INTERNAL_URL");
|
||||
}
|
||||
Mode::Worker
|
||||
} else if &x == "agent" {
|
||||
println!("Binary is in 'agent' mode");
|
||||
if std::env::var("BASE_INTERNAL_URL").is_err() {
|
||||
panic!("BASE_INTERNAL_URL is required in agent mode")
|
||||
}
|
||||
if std::env::var("JOB_TOKEN").is_err() {
|
||||
println!("JOB_TOKEN is not passed, hence workers will still need to create permissions for each job and the DATABASE_URL needs to be of a role that can INSERT into the job_perms table")
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "enterprise"))]
|
||||
{
|
||||
panic!("Agent mode is only available in the EE, ignoring...");
|
||||
}
|
||||
#[cfg(feature = "enterprise")]
|
||||
Mode::Agent
|
||||
} else if &x == "indexer" {
|
||||
tracing::info!("Binary is in 'indexer' mode");
|
||||
#[cfg(not(feature = "tantivy"))]
|
||||
{
|
||||
eprintln!("Cannot start the indexer because tantivy is not included in this binary/image. Make sure you are using the EE image if you want to access the full text search features.");
|
||||
panic!("Indexer mode requires compiling with the tantivy feature flag.");
|
||||
}
|
||||
#[cfg(feature = "tantivy")]
|
||||
Mode::Indexer
|
||||
} else if &x == "standalone+search"{
|
||||
enable_standalone_indexer = true;
|
||||
println!("Binary is in 'standalone' mode with search enabled");
|
||||
Mode::Standalone
|
||||
}
|
||||
else {
|
||||
if &x != "standalone" {
|
||||
eprintln!("mode not recognized, defaulting to standalone: {x}");
|
||||
} else {
|
||||
println!("Binary is in 'standalone' mode");
|
||||
}
|
||||
Mode::Standalone
|
||||
}
|
||||
})
|
||||
.unwrap_or_else(|_| {
|
||||
tracing::info!("Mode not specified, defaulting to standalone");
|
||||
Mode::Standalone
|
||||
});
|
||||
if mode == Mode::Standalone {
|
||||
println!("Running in standalone mode");
|
||||
}
|
||||
|
||||
#[cfg(all(not(target_env = "msvc"), feature = "jemalloc"))]
|
||||
println!("jemalloc enabled");
|
||||
@@ -537,8 +486,7 @@ Windmill Community Edition {GIT_VERSION}
|
||||
.expect("could not create initial server dir");
|
||||
|
||||
#[cfg(feature = "tantivy")]
|
||||
let should_index_jobs =
|
||||
mode == Mode::Indexer || (enable_standalone_indexer && mode == Mode::Standalone);
|
||||
let should_index_jobs = mode == Mode::Indexer || mode_and_addons.indexer;
|
||||
|
||||
reload_indexer_config(&db).await;
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ use crate::{
|
||||
webhook_util::{WebhookMessage, WebhookShared},
|
||||
HTTP_CLIENT,
|
||||
};
|
||||
#[cfg(all(feature = "enterprise", feature = "parquet"))]
|
||||
use axum::extract::Multipart;
|
||||
|
||||
use axum::{
|
||||
@@ -42,7 +41,6 @@ use std::{
|
||||
use windmill_audit::audit_ee::audit_log;
|
||||
use windmill_audit::ActionKind;
|
||||
|
||||
#[cfg(all(feature = "enterprise", feature = "parquet"))]
|
||||
use windmill_common::error::to_anyhow;
|
||||
|
||||
use windmill_common::{
|
||||
@@ -358,12 +356,6 @@ fn hash_script(ns: &NewScript) -> i64 {
|
||||
dh.finish() as i64
|
||||
}
|
||||
|
||||
#[cfg(not(all(feature = "enterprise", feature = "parquet")))]
|
||||
async fn create_snapshot_script() -> Result<(StatusCode, String)> {
|
||||
Err(Error::BadRequest("Upgrade to EE to use bundle".to_string()))
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "enterprise", feature = "parquet"))]
|
||||
async fn create_snapshot_script(
|
||||
authed: ApiAuthed,
|
||||
Extension(user_db): Extension<UserDB>,
|
||||
@@ -404,12 +396,35 @@ async fn create_snapshot_script(
|
||||
})?;
|
||||
|
||||
uploaded = true;
|
||||
let path = windmill_common::s3_helpers::bundle(&w_id, &hash);
|
||||
|
||||
if &windmill_common::utils::MODE_AND_ADDONS.mode
|
||||
== &windmill_common::utils::Mode::Standalone
|
||||
{
|
||||
std::fs::create_dir_all(format!(
|
||||
"{}/script_bundle/{}",
|
||||
windmill_common::worker::ROOT_CACHE_NOMOUNT_DIR,
|
||||
w_id
|
||||
))?;
|
||||
windmill_common::worker::write_file(
|
||||
windmill_common::worker::ROOT_CACHE_NOMOUNT_DIR,
|
||||
&path,
|
||||
&String::from_utf8_lossy(&data),
|
||||
)?;
|
||||
return Ok((StatusCode::CREATED, format!("{}", script_hash.unwrap())));
|
||||
}
|
||||
|
||||
#[cfg(not(all(feature = "enterprise", feature = "parquet")))]
|
||||
{
|
||||
return Err(Error::ExecutionErr("codebase is an EE feature".to_string()));
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "enterprise", feature = "parquet"))]
|
||||
if let Some(os) = windmill_common::s3_helpers::OBJECT_STORE_CACHE_SETTINGS
|
||||
.read()
|
||||
.await
|
||||
.clone()
|
||||
{
|
||||
let path = windmill_common::s3_helpers::bundle(&w_id, &hash);
|
||||
if let Err(e) = os
|
||||
.put(&object_store::path::Path::from(path.clone()), data.into())
|
||||
.await
|
||||
|
||||
@@ -8,6 +8,7 @@ edition.workspace = true
|
||||
default = []
|
||||
enterprise = []
|
||||
jemalloc = ["dep:tikv-jemalloc-ctl"]
|
||||
tantivy = []
|
||||
prometheus = ["dep:prometheus"]
|
||||
loki = ["dep:tracing-loki"]
|
||||
benchmark = []
|
||||
|
||||
@@ -53,6 +53,76 @@ lazy_static::lazy_static! {
|
||||
GIT_VERSION
|
||||
}
|
||||
).unwrap_or(Version::new(0, 1, 0));
|
||||
|
||||
|
||||
pub static ref MODE_AND_ADDONS: ModeAndAddons = {
|
||||
let mut search_addon = false;
|
||||
let mode = std::env::var("MODE")
|
||||
.map(|x| x.to_lowercase())
|
||||
.map(|x| {
|
||||
if &x == "server" {
|
||||
println!("Binary is in 'server' mode");
|
||||
Mode::Server
|
||||
} else if &x == "worker" {
|
||||
tracing::info!("Binary is in 'worker' mode");
|
||||
#[cfg(windows)]
|
||||
{
|
||||
println!("It is highly recommended to use the agent mode instead on windows (MODE=agent) and to pass a BASE_INTERNAL_URL");
|
||||
}
|
||||
Mode::Worker
|
||||
} else if &x == "agent" {
|
||||
println!("Binary is in 'agent' mode");
|
||||
if std::env::var("BASE_INTERNAL_URL").is_err() {
|
||||
panic!("BASE_INTERNAL_URL is required in agent mode")
|
||||
}
|
||||
if std::env::var("JOB_TOKEN").is_err() {
|
||||
println!("JOB_TOKEN is not passed, hence workers will still need to create permissions for each job and the DATABASE_URL needs to be of a role that can INSERT into the job_perms table")
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "enterprise"))]
|
||||
{
|
||||
panic!("Agent mode is only available in the EE, ignoring...");
|
||||
}
|
||||
#[cfg(feature = "enterprise")]
|
||||
Mode::Agent
|
||||
} else if &x == "indexer" {
|
||||
tracing::info!("Binary is in 'indexer' mode");
|
||||
#[cfg(not(feature = "tantivy"))]
|
||||
{
|
||||
eprintln!("Cannot start the indexer because tantivy is not included in this binary/image. Make sure you are using the EE image if you want to access the full text search features.");
|
||||
panic!("Indexer mode requires compiling with the tantivy feature flag.");
|
||||
}
|
||||
#[cfg(feature = "tantivy")]
|
||||
Mode::Indexer
|
||||
} else if &x == "standalone+search"{
|
||||
search_addon = true;
|
||||
println!("Binary is in 'standalone' mode with search enabled");
|
||||
Mode::Standalone
|
||||
}
|
||||
else {
|
||||
if &x != "standalone" {
|
||||
eprintln!("mode not recognized, defaulting to standalone: {x}");
|
||||
} else {
|
||||
println!("Binary is in 'standalone' mode");
|
||||
}
|
||||
Mode::Standalone
|
||||
}
|
||||
})
|
||||
.unwrap_or_else(|_| {
|
||||
tracing::info!("Mode not specified, defaulting to standalone");
|
||||
Mode::Standalone
|
||||
});
|
||||
ModeAndAddons {
|
||||
indexer: search_addon,
|
||||
mode,
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ModeAndAddons {
|
||||
pub indexer: bool,
|
||||
pub mode: Mode,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Clone)]
|
||||
|
||||
@@ -104,6 +104,7 @@ lazy_static::lazy_static! {
|
||||
pub static ref DISABLE_FLOW_SCRIPT: bool = std::env::var("DISABLE_FLOW_SCRIPT").ok().is_some_and(|x| x == "1" || x == "true");
|
||||
}
|
||||
|
||||
pub const ROOT_CACHE_NOMOUNT_DIR: &str = concatcp!(TMP_DIR, "/cache_nomount/");
|
||||
pub static MIN_VERSION_IS_LATEST: AtomicBool = AtomicBool::new(false);
|
||||
|
||||
fn format_pull_query(peek: String) -> String {
|
||||
|
||||
@@ -579,20 +579,22 @@ pub async fn generate_bun_bundle(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "enterprise", feature = "parquet"))]
|
||||
pub async fn pull_codebase(w_id: &str, id: &str, job_dir: &str) -> Result<()> {
|
||||
use crate::global_cache::extract_tar;
|
||||
|
||||
let path = windmill_common::s3_helpers::bundle(&w_id, &id);
|
||||
let bun_cache_path = format!("{}/{}", crate::ROOT_CACHE_NOMOUNT_DIR, path);
|
||||
let bun_cache_path = format!(
|
||||
"{}/{}",
|
||||
windmill_common::worker::ROOT_CACHE_NOMOUNT_DIR,
|
||||
path
|
||||
);
|
||||
let is_tar = id.ends_with(".tar");
|
||||
|
||||
let dst = format!(
|
||||
"{job_dir}/{}",
|
||||
if is_tar { "codebase.tar" } else { "main.js" }
|
||||
);
|
||||
let dirs_splitted = bun_cache_path.split("/").collect_vec();
|
||||
tokio::fs::create_dir_all(dirs_splitted[..dirs_splitted.len() - 1].join("/")).await?;
|
||||
|
||||
if tokio::fs::metadata(&bun_cache_path).await.is_ok() {
|
||||
tracing::info!("loading {bun_cache_path} from cache");
|
||||
if is_tar {
|
||||
@@ -604,35 +606,47 @@ pub async fn pull_codebase(w_id: &str, id: &str, job_dir: &str) -> Result<()> {
|
||||
#[cfg(windows)]
|
||||
std::os::windows::fs::symlink_dir(&bun_cache_path, &dst)?;
|
||||
}
|
||||
} else if let Some(os) = windmill_common::s3_helpers::OBJECT_STORE_CACHE_SETTINGS
|
||||
.read()
|
||||
.await
|
||||
.clone()
|
||||
{
|
||||
let bytes = attempt_fetch_bytes(os, &path).await?;
|
||||
|
||||
tokio::fs::write(&bun_cache_path, &bytes).await?;
|
||||
if is_tar {
|
||||
extract_tar(bytes, job_dir).await?;
|
||||
} else {
|
||||
#[cfg(unix)]
|
||||
tokio::fs::symlink(bun_cache_path, dst).await?;
|
||||
|
||||
#[cfg(windows)]
|
||||
std::os::windows::fs::symlink_dir(&bun_cache_path, &dst)?;
|
||||
} else {
|
||||
#[cfg(not(all(feature = "enterprise", feature = "parquet")))]
|
||||
{
|
||||
if &windmill_common::utils::MODE_AND_ADDONS.mode
|
||||
== &windmill_common::utils::Mode::Standalone
|
||||
{
|
||||
return Err(error::Error::ExecutionErr(format!(
|
||||
"(standalone bundle test mode) could not find codebase at {bun_cache_path}"
|
||||
)));
|
||||
} else {
|
||||
return Err(error::Error::ExecutionErr(
|
||||
"codebase is an EE feature".to_string(),
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
// extract_tar(bytes, job_dir).await?;
|
||||
#[cfg(all(feature = "enterprise", feature = "parquet"))]
|
||||
if let Some(os) = windmill_common::s3_helpers::OBJECT_STORE_CACHE_SETTINGS
|
||||
.read()
|
||||
.await
|
||||
.clone()
|
||||
{
|
||||
let dirs_splitted = bun_cache_path.split("/").collect_vec();
|
||||
tokio::fs::create_dir_all(dirs_splitted[..dirs_splitted.len() - 1].join("/")).await?;
|
||||
|
||||
let bytes = attempt_fetch_bytes(os, &path).await?;
|
||||
|
||||
tokio::fs::write(&bun_cache_path, &bytes).await?;
|
||||
if is_tar {
|
||||
extract_tar(bytes, job_dir).await?;
|
||||
} else {
|
||||
#[cfg(unix)]
|
||||
tokio::fs::symlink(bun_cache_path, dst).await?;
|
||||
|
||||
#[cfg(windows)]
|
||||
std::os::windows::fs::symlink_dir(&bun_cache_path, &dst)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
#[cfg(not(all(feature = "enterprise", feature = "parquet")))]
|
||||
pub async fn pull_codebase(_w_id: &str, _id: &str, _job_dir: &str) -> Result<()> {
|
||||
return Err(error::Error::ExecutionErr(
|
||||
"codebase is an EE feature".to_string(),
|
||||
));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn prebundle_bun_script(
|
||||
@@ -837,13 +851,6 @@ pub async fn handle_bun_job(
|
||||
let main_override = job.script_entrypoint_override.as_deref();
|
||||
let apply_preprocessor = !job.is_flow_step && job.preprocessed == Some(false);
|
||||
|
||||
#[cfg(not(feature = "enterprise"))]
|
||||
if annotation.nodejs || annotation.npm {
|
||||
return Err(error::Error::ExecutionErr(
|
||||
"Nodejs / npm mode is an EE feature".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
if has_bundle_cache {
|
||||
let target;
|
||||
let symlink;
|
||||
|
||||
@@ -1,15 +1,12 @@
|
||||
// #[cfg(feature = "enterprise")]
|
||||
// use rand::Rng;
|
||||
|
||||
#[cfg(all(feature = "enterprise", feature = "parquet"))]
|
||||
use tokio::time::Instant;
|
||||
use windmill_common::error;
|
||||
|
||||
#[cfg(all(feature = "enterprise", feature = "parquet", unix))]
|
||||
use object_store::ObjectStore;
|
||||
|
||||
#[cfg(all(feature = "enterprise", feature = "parquet"))]
|
||||
use windmill_common::error;
|
||||
|
||||
#[cfg(all(feature = "enterprise", feature = "parquet", unix))]
|
||||
use std::sync::Arc;
|
||||
|
||||
@@ -109,7 +106,6 @@ pub async fn pull_from_tar(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(all(feature = "enterprise", feature = "parquet"))]
|
||||
pub async fn extract_tar(tar: bytes::Bytes, folder: &str) -> error::Result<()> {
|
||||
use bytes::Buf;
|
||||
use tokio::fs::{self};
|
||||
|
||||
@@ -18,7 +18,7 @@ use windmill_common::{
|
||||
utils::WarnAfterExt,
|
||||
worker::{
|
||||
get_memory, get_vcpus, get_windmill_memory_usage, get_worker_memory_usage, write_file,
|
||||
ROOT_CACHE_DIR, TMP_DIR,
|
||||
ROOT_CACHE_DIR, ROOT_CACHE_NOMOUNT_DIR, TMP_DIR,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -250,8 +250,6 @@ pub async fn create_token_for_owner(
|
||||
Ok(format!("jwt_{}", token))
|
||||
}
|
||||
|
||||
pub const ROOT_CACHE_NOMOUNT_DIR: &str = concatcp!(TMP_DIR, "/cache_nomount/");
|
||||
|
||||
pub const PY310_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "python_310");
|
||||
pub const PY311_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "python_311");
|
||||
pub const PY312_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "python_312");
|
||||
@@ -274,6 +272,7 @@ pub const RUST_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "rust");
|
||||
pub const CSHARP_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "csharp");
|
||||
pub const BUN_CACHE_DIR: &str = concatcp!(ROOT_CACHE_NOMOUNT_DIR, "bun");
|
||||
pub const BUN_BUNDLE_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "bun");
|
||||
pub const BUN_CODEBASE_BUNDLE_CACHE_DIR: &str = concatcp!(ROOT_CACHE_NOMOUNT_DIR, "script_bundle");
|
||||
|
||||
pub const GO_BIN_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "gobin");
|
||||
pub const POWERSHELL_CACHE_DIR: &str = concatcp!(ROOT_CACHE_DIR, "powershell");
|
||||
|
||||
Reference in New Issue
Block a user