Separate duckdb crate to fix c++ build/link issues (#6551)
* call ffi * remove duckdb dep * rename windmill_duckdb_ffi_internal * static lib * ci * back to dylib, bug isn't fixed in static * feature flag and copy dynamic lib * fix dynlib in docker * load libwindmill_duckdb_ffi_internal at runtime on usage * lazy static deadlocks * Cache dynamic library handles * update auto s3 path insert from editor bar * Fix duckdb S3 freezing worker because of blocking task in tokio async context * build dll windows GH workflow * try fix windows build * revert build.rs * nit fixes CI * Dockerfile update (not tested yet * build dev sh for duckdb lib * mistake * attach windmill_duckdb_ffi_internal.so artefact * rhel9 * docker fixes * fix dockerfile * better err msg * forgot lib prefix .so * add column_order * fix column_order
This commit is contained in:
@@ -3,3 +3,4 @@ frontend/build/
|
||||
frontend/.svelte-kit/
|
||||
|
||||
backend/target/
|
||||
backend/windmill-duckdb-ffi-internal/target/
|
||||
14
.github/workflows/build-publish-rh-image.yml
vendored
14
.github/workflows/build-publish-rh-image.yml
vendored
@@ -97,6 +97,12 @@ jobs:
|
||||
image: ${{ steps.meta-ee-public.outputs.tags}}-amd64
|
||||
path: "/windmill/target/release/windmill"
|
||||
|
||||
- uses: shrink/actions-docker-extract@v3
|
||||
id: extract-duckdb-ffi-internal
|
||||
with:
|
||||
image: ${{ steps.meta-ee-public.outputs.tags}}-amd64
|
||||
path: "/usr/src/app/libwindmill_duckdb_ffi_internal.so"
|
||||
|
||||
# - uses: shrink/actions-docker-extract@v3
|
||||
# id: extract-ee-arm64
|
||||
# with:
|
||||
@@ -111,8 +117,12 @@ jobs:
|
||||
- uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: RHEL9-amd64 build
|
||||
path: ${{ steps.extract-ee-amd64.outputs.destination
|
||||
}}/windmill-ee-amd64-rhel9
|
||||
path: ${{ steps.extract-ee-amd64.outputs.destination }}/windmill-ee-amd64-rhel9
|
||||
|
||||
- uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: RHEL9-amd64 dynamic libraries build
|
||||
path: ${{ steps.extract-duckdb-ffi-internal.outputs.destination }}/libwindmill_duckdb_ffi_internal.so
|
||||
|
||||
# - uses: actions/upload-artifact@v4
|
||||
# with:
|
||||
|
||||
16
.github/workflows/build_windows_worker_.yml
vendored
16
.github/workflows/build_windows_worker_.yml
vendored
@@ -41,7 +41,13 @@ jobs:
|
||||
run: |
|
||||
./backend/substitute_ee_code.sh --copy --dir ./windmill-ee-private
|
||||
|
||||
- name: Cargo build windows
|
||||
- name: Cargo build dynamic libraries windows
|
||||
timeout-minutes: 90
|
||||
run: |
|
||||
cd backend/windmill-duckdb-ffi-internal
|
||||
cargo build --release -p windmill_duckdb_ffi_internal
|
||||
|
||||
- name: Cargo build binary windows
|
||||
timeout-minutes: 90
|
||||
run: |
|
||||
vcpkg.exe install openssl-windows:x64-windows
|
||||
@@ -56,8 +62,14 @@ jobs:
|
||||
run: |
|
||||
Rename-Item -Path ".\backend\target\release\windmill.exe" -NewName "windmill-ee.exe"
|
||||
|
||||
- name: Upload artifact
|
||||
- name: Upload binary artifact
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: windmill-ee-binary
|
||||
path: ./backend/target/release/windmill-ee.exe
|
||||
|
||||
- name: Upload dynamic libraries artifact
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: windmill_duckdb_ffi_internal.dll
|
||||
path: ./backend/windmill-duckdb-ffi-internal/target/release/windmill_duckdb_ffi_internal.dll
|
||||
|
||||
7
.github/workflows/docker-image.yml
vendored
7
.github/workflows/docker-image.yml
vendored
@@ -220,6 +220,12 @@ jobs:
|
||||
image: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ env.DEV_SHA }}
|
||||
path: "/usr/src/app/windmill"
|
||||
|
||||
- uses: shrink/actions-docker-extract@v3
|
||||
id: extract-duckdb-ffi-internal
|
||||
with:
|
||||
image: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ env.DEV_SHA }}
|
||||
path: "/usr/src/app/libwindmill_duckdb_ffi_internal.so"
|
||||
|
||||
- uses: shrink/actions-docker-extract@v3
|
||||
id: extract-ee
|
||||
with:
|
||||
@@ -237,6 +243,7 @@ jobs:
|
||||
files: |
|
||||
${{ steps.extract.outputs.destination }}/*
|
||||
${{ steps.extract-ee.outputs.destination }}/*
|
||||
${{ steps.extract-duckdb-ffi-internal.outputs.destination }}/*
|
||||
|
||||
# attach_arm64_binary_to_release:
|
||||
# needs: [build, build_ee]
|
||||
|
||||
12
.github/workflows/publish_windows_worker.yml
vendored
12
.github/workflows/publish_windows_worker.yml
vendored
@@ -43,6 +43,12 @@ jobs:
|
||||
run: |
|
||||
./backend/substitute_ee_code.sh --copy --dir ./windmill-ee-private
|
||||
|
||||
- name: Cargo build dynamic libraries windows
|
||||
timeout-minutes: 90
|
||||
run: |
|
||||
cd backend/windmill-duckdb-ffi-internal
|
||||
cargo build --release -p windmill_duckdb_ffi_internal
|
||||
|
||||
- name: Cargo build windows
|
||||
timeout-minutes: 90
|
||||
run: |
|
||||
@@ -63,3 +69,9 @@ jobs:
|
||||
with:
|
||||
files: |
|
||||
./backend/target/release/windmill-ee.exe
|
||||
|
||||
- name: Attach dynamic libraries to release
|
||||
uses: softprops/action-gh-release@v2
|
||||
with:
|
||||
files: |
|
||||
./backend/windmill-duckdb-ffi-internal/target/release/windmill_duckdb_ffi_internal.dll
|
||||
|
||||
13
Dockerfile
13
Dockerfile
@@ -1,6 +1,16 @@
|
||||
ARG DEBIAN_IMAGE=debian:bookworm-slim
|
||||
ARG RUST_IMAGE=rust:1.88-slim-bookworm
|
||||
|
||||
# Build libwindmill_duckdb_ffi_internal.so separately
|
||||
FROM ${RUST_IMAGE} AS windmill_duckdb_ffi_internal_builder
|
||||
|
||||
WORKDIR /windmill-duckdb-ffi-internal
|
||||
RUN apt-get update && apt-get install -y pkg-config clang=1:14.0-55.* libclang-dev=1:14.0-55.* cmake=3.25.* && \
|
||||
apt-get clean && \
|
||||
rm -rf /var/lib/apt/lists/*
|
||||
COPY ./backend/windmill-duckdb-ffi-internal .
|
||||
RUN cargo build --release -p windmill_duckdb_ffi_internal
|
||||
|
||||
FROM ${RUST_IMAGE} AS rust_base
|
||||
|
||||
RUN apt-get update && apt-get install -y git libssl-dev pkg-config npm
|
||||
@@ -82,7 +92,6 @@ RUN --mount=type=cache,target=/usr/local/cargo/registry \
|
||||
--mount=type=cache,target=$SCCACHE_DIR,sharing=locked \
|
||||
CARGO_NET_GIT_FETCH_WITH_CLI=true cargo build --release --features "$features"
|
||||
|
||||
|
||||
FROM ${DEBIAN_IMAGE}
|
||||
|
||||
ARG TARGETPLATFORM
|
||||
@@ -191,6 +200,7 @@ ENV TZ=Etc/UTC
|
||||
|
||||
COPY --from=builder /frontend/build /static_frontend
|
||||
COPY --from=builder /windmill/target/release/windmill ${APP}/windmill
|
||||
COPY --from=windmill_duckdb_ffi_internal_builder /windmill-duckdb-ffi-internal/target/release/libwindmill_duckdb_ffi_internal.so ${APP}/libwindmill_duckdb_ffi_internal.so
|
||||
|
||||
COPY --from=denoland/deno:2.2.1 --chmod=755 /usr/bin/deno /usr/bin/deno
|
||||
|
||||
@@ -204,6 +214,7 @@ COPY --from=docker:dind /usr/local/bin/docker /usr/local/bin/
|
||||
|
||||
ENV RUSTUP_HOME="/usr/local/rustup"
|
||||
ENV CARGO_HOME="/usr/local/cargo"
|
||||
ENV LD_LIBRARY_PATH="."
|
||||
|
||||
WORKDIR ${APP}
|
||||
|
||||
|
||||
@@ -5,10 +5,12 @@ incremental = true
|
||||
rustflags = [
|
||||
"-C", "link-arg=-undefined",
|
||||
"-C", "link-arg=dynamic_lookup",
|
||||
"-C", "link-args=-Wl,-rpath,$ORIGIN/"
|
||||
]
|
||||
|
||||
[target.aarch64-apple-darwin]
|
||||
rustflags = [
|
||||
"-C", "link-arg=-undefined",
|
||||
"-C", "link-arg=dynamic_lookup",
|
||||
"-C", "link-args=-Wl,-rpath,$ORIGIN/"
|
||||
]
|
||||
45
backend/Cargo.lock
generated
45
backend/Cargo.lock
generated
@@ -458,9 +458,6 @@ name = "arrow-schema"
|
||||
version = "55.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "af7686986a3bf2254c9fb130c623cdcb2f8e1f15763e7c71c310f0834da3d292"
|
||||
dependencies = [
|
||||
"bitflags 2.9.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "arrow-select"
|
||||
@@ -1915,12 +1912,6 @@ dependencies = [
|
||||
"syn 2.0.106",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cast"
|
||||
version = "0.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
|
||||
|
||||
[[package]]
|
||||
name = "cbc"
|
||||
version = "0.1.2"
|
||||
@@ -4788,24 +4779,6 @@ dependencies = [
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "duckdb"
|
||||
version = "1.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "07ab83a22530667ffc8cc0e31c0549bb07bea5dba3b957a8e315effc38923701"
|
||||
dependencies = [
|
||||
"arrow",
|
||||
"cast",
|
||||
"fallible-iterator 0.3.0",
|
||||
"fallible-streaming-iterator",
|
||||
"hashlink 0.10.0",
|
||||
"libduckdb-sys",
|
||||
"num-integer",
|
||||
"rust_decimal",
|
||||
"smallvec",
|
||||
"strum 0.27.2",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "dunce"
|
||||
version = "1.0.5"
|
||||
@@ -7620,21 +7593,6 @@ version = "0.2.175"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6a82ae493e598baaea5209805c49bbf2ea7de956d50d7da0da1164f9c6d28543"
|
||||
|
||||
[[package]]
|
||||
name = "libduckdb-sys"
|
||||
version = "1.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "4e02f6069513efb67a0743aff3b846090de14763802b0e95c352ebc6e1bdc1da"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"flate2",
|
||||
"pkg-config",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tar",
|
||||
"vcpkg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "libffi"
|
||||
version = "3.2.0"
|
||||
@@ -15168,6 +15126,7 @@ dependencies = [
|
||||
"k8s-openapi",
|
||||
"kube",
|
||||
"lazy_static",
|
||||
"libloading 0.8.8",
|
||||
"memchr",
|
||||
"object_store",
|
||||
"once_cell",
|
||||
@@ -15803,7 +15762,6 @@ dependencies = [
|
||||
"deno_web",
|
||||
"deno_webidl",
|
||||
"dotenv",
|
||||
"duckdb",
|
||||
"dyn-iter",
|
||||
"flume",
|
||||
"futures",
|
||||
@@ -15813,6 +15771,7 @@ dependencies = [
|
||||
"itertools 0.14.0",
|
||||
"jsonwebtoken 8.3.0",
|
||||
"lazy_static",
|
||||
"libloading 0.8.8",
|
||||
"mappable-rc",
|
||||
"mysql_async",
|
||||
"native-tls",
|
||||
|
||||
@@ -31,6 +31,7 @@ members = [
|
||||
"./parsers/windmill-sql-datatype-parser-wasm",
|
||||
"./parsers/windmill-parser-yaml", "windmill-macros", "parsers/windmill-parser-nu"
|
||||
]
|
||||
exclude = ["./windmill-duckdb-ffi-internal"]
|
||||
|
||||
[workspace.package]
|
||||
version = "1.541.0"
|
||||
@@ -99,8 +100,7 @@ java = ["windmill-worker/java"]
|
||||
ruby = ["windmill-worker/ruby"]
|
||||
all_languages = ["python", "deno_core", "rust", "mysql", "oracledb", "duckdb", "mssql", "bigquery", "csharp", "nu", "php", "java", "ruby"]
|
||||
# For windows we have another set of languages enabled
|
||||
# NOTE: DuckDB is ignored because of compilation problems
|
||||
all_languages_windows = ["python", "deno_core", "rust", "mysql", "oracledb", "mssql", "bigquery", "csharp", "nu", "php", "java"]
|
||||
all_languages_windows = ["python", "deno_core", "rust", "mysql", "oracledb", "duckdb", "mssql", "bigquery", "csharp", "nu", "php", "java"]
|
||||
|
||||
[patch.crates-io]
|
||||
object_store = { git = "https://github.com/apache/arrow-rs-object-store", rev = "36752c975d4f29e20b57c91f81a10872dcd48ae7" }
|
||||
@@ -150,6 +150,7 @@ aws-sigv4.workspace = true
|
||||
aws-sdk-config.workspace = true
|
||||
kube.workspace = true
|
||||
k8s-openapi.workspace = true
|
||||
libloading.workspace = true
|
||||
|
||||
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
||||
tikv-jemallocator = { optional = true, workspace = true }
|
||||
@@ -249,7 +250,6 @@ json-pointer = "^0"
|
||||
itertools = "^0"
|
||||
regex = "^1"
|
||||
semver = "^1"
|
||||
duckdb = { version = "^1.3.2", features = ["bundled"] }
|
||||
aws-sigv4 = "^1.3.4"
|
||||
aws-sdk-config = "=1.68.0"
|
||||
async-trait = "0.1.88"
|
||||
@@ -404,6 +404,7 @@ size = "0.5.0"
|
||||
flume = { version = "0.11.1", features = ["async"] }
|
||||
kube = { version = "1.1.0", features = ["runtime", "derive"] }
|
||||
k8s-openapi = { version = "0.25.0", features = ["latest"] }
|
||||
libloading = "0.8.8"
|
||||
|
||||
# Macro-related
|
||||
proc-macro2 = "1.0"
|
||||
|
||||
1532
backend/windmill-duckdb-ffi-internal/Cargo.lock
generated
Normal file
1532
backend/windmill-duckdb-ffi-internal/Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
14
backend/windmill-duckdb-ffi-internal/Cargo.toml
Normal file
14
backend/windmill-duckdb-ffi-internal/Cargo.toml
Normal file
@@ -0,0 +1,14 @@
|
||||
[package]
|
||||
name = "windmill_duckdb_ffi_internal"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
chrono = "0.4.41"
|
||||
duckdb = { version = "^1.3.2", features = ["bundled"] }
|
||||
rust_decimal = "1.37.2"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = { version = "^1", features = ["preserve_order", "raw_value"] }
|
||||
|
||||
[lib]
|
||||
crate-type = ["cdylib"]
|
||||
15
backend/windmill-duckdb-ffi-internal/README_DEV.md
Normal file
15
backend/windmill-duckdb-ffi-internal/README_DEV.md
Normal file
@@ -0,0 +1,15 @@
|
||||
This crate is compiled separately because it causes nasty issues when compiled with the deno_core feature flag enabled (lib c++ interactions).
|
||||
|
||||
The main issue was :
|
||||
Errors in DuckDB always worked correctly, except when attached to a Ducklake and when the deno_core feature flag was set.
|
||||
For example:
|
||||
|
||||
```sql
|
||||
ATTACH 'ducklake' AS dl; USE dl;
|
||||
CREATE TABLE IF NOT EXISTS t (x string not null);
|
||||
INSERT INTO t VALUES (NULL);
|
||||
```
|
||||
|
||||
causes `Constraint Error: NOT NULL constraint failed: t.x` normally, but here we see `Unknown exception in ExecutorTask::Execute`. This opaque errors comes directly from the C++ DuckDB library : https://github.com/duckdb/duckdb/blob/f99fed1e0b16a842573f9dad529f6c170a004f6e/src/parallel/executor_task.cpp#L58
|
||||
|
||||
To solve this, we compile duckdb separately from the main backend crate and call it with FFI
|
||||
5
backend/windmill-duckdb-ffi-internal/build.rs
Normal file
5
backend/windmill-duckdb-ffi-internal/build.rs
Normal file
@@ -0,0 +1,5 @@
|
||||
fn main() {
|
||||
// Duckdb requires Windows Restart Manager library on Windows
|
||||
#[cfg(target_os = "windows")]
|
||||
println!("cargo:rustc-link-lib=Rstrtmgr");
|
||||
}
|
||||
2
backend/windmill-duckdb-ffi-internal/build_dev.sh
Executable file
2
backend/windmill-duckdb-ffi-internal/build_dev.sh
Executable file
@@ -0,0 +1,2 @@
|
||||
cargo build --release -p windmill_duckdb_ffi_internal
|
||||
cp target/release/libwindmill_duckdb_ffi_internal.* ../target/debug/
|
||||
450
backend/windmill-duckdb-ffi-internal/src/lib.rs
Normal file
450
backend/windmill-duckdb-ffi-internal/src/lib.rs
Normal file
@@ -0,0 +1,450 @@
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
ffi::{c_char, CStr, CString},
|
||||
ptr::null_mut,
|
||||
};
|
||||
|
||||
use duckdb::{params_from_iter, types::TimeUnit, Row};
|
||||
use rust_decimal::{prelude::FromPrimitive, Decimal};
|
||||
use serde::Deserialize;
|
||||
use serde_json::value::RawValue;
|
||||
|
||||
#[derive(Deserialize, Clone, Debug, PartialEq, Default)]
|
||||
pub struct Arg {
|
||||
pub name: String,
|
||||
pub arg_type: String,
|
||||
pub json_value: serde_json::Value,
|
||||
}
|
||||
|
||||
#[unsafe(no_mangle)]
|
||||
pub extern "C" fn run_duckdb_ffi(
|
||||
query_block_list: *const *const c_char,
|
||||
query_block_list_count: usize,
|
||||
job_args: *const c_char,
|
||||
token: *const c_char,
|
||||
base_internal_url: *const c_char,
|
||||
w_id: *const c_char,
|
||||
column_order_ptr: *mut *mut c_char,
|
||||
) -> *mut c_char {
|
||||
let (r, column_order) = match convert_args(
|
||||
query_block_list,
|
||||
query_block_list_count,
|
||||
job_args,
|
||||
token,
|
||||
base_internal_url,
|
||||
w_id,
|
||||
)
|
||||
.and_then(
|
||||
|(query_block_list, job_args, token, base_internal_url, w_id)| {
|
||||
run_duckdb_internal(
|
||||
query_block_list,
|
||||
query_block_list_count,
|
||||
job_args,
|
||||
token,
|
||||
base_internal_url,
|
||||
w_id,
|
||||
)
|
||||
},
|
||||
) {
|
||||
Ok(result) => result,
|
||||
Err(err) => {
|
||||
let err = serde_json::to_string(&err)
|
||||
.unwrap_or_else(|_| "Unknown error in duckdb ffi lib".to_string());
|
||||
(format!("ERROR {}", err), None)
|
||||
}
|
||||
};
|
||||
|
||||
unsafe {
|
||||
if let Some(column_order) = column_order {
|
||||
let column_order =
|
||||
serde_json::to_string(&column_order).unwrap_or_else(|_| "[]".to_string());
|
||||
let c_column_order =
|
||||
CString::new(column_order).unwrap_or_else(|_| CString::new("[]").unwrap());
|
||||
*column_order_ptr = c_column_order.into_raw();
|
||||
} else {
|
||||
*column_order_ptr = null_mut();
|
||||
}
|
||||
}
|
||||
// CString::into_raw because it needs to outlive this function call.
|
||||
// The caller is responsible for freeing the memory through CString::from_raw.
|
||||
CString::new(r).map(|s| s.into_raw()).unwrap_or_else(|e| {
|
||||
println!("Failed to allocate error string in duckdb ffi lib: {:?}", e);
|
||||
null_mut()
|
||||
})
|
||||
}
|
||||
|
||||
fn convert_args<'a>(
|
||||
query_block_list: *const *const c_char,
|
||||
query_block_list_count: usize,
|
||||
job_args: *const c_char,
|
||||
token: *const c_char,
|
||||
base_internal_url: *const c_char,
|
||||
w_id: *const c_char,
|
||||
) -> Result<
|
||||
(
|
||||
impl Iterator<Item = &'a str>,
|
||||
HashMap<String, duckdb::types::Value>,
|
||||
&'a str,
|
||||
&'a str,
|
||||
&'a str,
|
||||
),
|
||||
String,
|
||||
> {
|
||||
let query_block_list = unsafe {
|
||||
std::slice::from_raw_parts(query_block_list, query_block_list_count)
|
||||
.iter()
|
||||
.map(|q| {
|
||||
CStr::from_ptr(*q).to_str().unwrap_or_else(|e| {
|
||||
println!(
|
||||
"Invalid query_block string pointer in duckdb ffi: {}",
|
||||
e.to_string()
|
||||
);
|
||||
"Invalid query_block string pointer in duckdb ffi"
|
||||
})
|
||||
})
|
||||
};
|
||||
let job_args_str = unsafe { CStr::from_ptr(job_args) }
|
||||
.to_str()
|
||||
.map_err(|e| format!("Invalid job_args string: {}", e.to_string()))?;
|
||||
let job_args: Vec<Arg> = serde_json::from_str(job_args_str)
|
||||
.map_err(|e| format!("Invalid job_args JSON: {}", e.to_string()))?;
|
||||
let job_args: HashMap<String, duckdb::types::Value> = job_args
|
||||
.into_iter()
|
||||
.map(|arg| {
|
||||
let duckdb_value = json_value_to_duckdb_value(&arg.json_value, &arg.arg_type)
|
||||
.unwrap_or_else(|e| {
|
||||
println!(
|
||||
"Error converting job_arg {} to duckdb value: {}",
|
||||
arg.name, e
|
||||
);
|
||||
duckdb::types::Value::Null
|
||||
});
|
||||
(arg.name, duckdb_value)
|
||||
})
|
||||
.collect();
|
||||
let token = unsafe { CStr::from_ptr(token) }
|
||||
.to_str()
|
||||
.map_err(|e| format!("Invalid token string: {}", e.to_string()))?;
|
||||
let base_internal_url = unsafe { CStr::from_ptr(base_internal_url) }
|
||||
.to_str()
|
||||
.map_err(|e| format!("Invalid base_internal_url string: {}", e.to_string()))?;
|
||||
let w_id = unsafe { CStr::from_ptr(w_id) }
|
||||
.to_str()
|
||||
.map_err(|e| format!("Invalid w_id string: {}", e.to_string()))?;
|
||||
Ok((query_block_list, job_args, token, base_internal_url, w_id))
|
||||
}
|
||||
|
||||
// TODO : Better error return to leverage different error kinds on the worker side
|
||||
fn run_duckdb_internal<'a>(
|
||||
query_block_list: impl Iterator<Item = &'a str>,
|
||||
query_block_list_count: usize,
|
||||
job_args: HashMap<String, duckdb::types::Value>,
|
||||
token: &str,
|
||||
base_internal_url: &str,
|
||||
w_id: &str,
|
||||
) -> Result<(String, Option<Vec<String>>), String> {
|
||||
let conn = duckdb::Connection::open_in_memory().map_err(|e| e.to_string())?;
|
||||
|
||||
let (s3_access_key, s3_secret_key) = token.split_at(token.rfind('.').unwrap_or(0));
|
||||
let s3_secret_key = &s3_secret_key[1..];
|
||||
let (s3_endpoint_ssl, s3_endpoint) = base_internal_url
|
||||
.split_once("://")
|
||||
.unwrap_or(("http", &base_internal_url));
|
||||
let s3_endpoint_ssl = match s3_endpoint_ssl {
|
||||
"https" => true,
|
||||
_ => false,
|
||||
};
|
||||
|
||||
conn.execute_batch(&format!(
|
||||
"INSTALL httpfs; LOAD httpfs;
|
||||
INSTALL azure; LOAD azure;
|
||||
CREATE OR REPLACE SECRET s3_secret (
|
||||
TYPE s3,
|
||||
PROVIDER config,
|
||||
KEY_ID '{s3_access_key}',
|
||||
SECRET '{s3_secret_key}',
|
||||
ENDPOINT '{s3_endpoint}/api/w/{w_id}/s3_proxy',
|
||||
URL_STYLE path,
|
||||
USE_SSL {s3_endpoint_ssl}
|
||||
);
|
||||
CREATE OR REPLACE SECRET gcs_secret (
|
||||
TYPE gcs,
|
||||
KEY_ID '{s3_access_key}',
|
||||
SECRET '{s3_secret_key}',
|
||||
ENDPOINT '{s3_endpoint}/api/w/{w_id}/s3_proxy',
|
||||
USE_SSL {s3_endpoint_ssl}
|
||||
);
|
||||
",
|
||||
))
|
||||
.map_err(|e| format!("Error setting up S3 secret: {}", e.to_string()))?;
|
||||
|
||||
let mut result: Option<Box<RawValue>> = None;
|
||||
let mut column_order = None;
|
||||
|
||||
for (query_block_index, query_block) in query_block_list.enumerate() {
|
||||
result = Some(
|
||||
do_duckdb_inner(
|
||||
&conn,
|
||||
query_block,
|
||||
&job_args,
|
||||
query_block_index != query_block_list_count - 1,
|
||||
&mut column_order,
|
||||
)
|
||||
.map_err(|e| e.to_string())?,
|
||||
);
|
||||
}
|
||||
let result = result.unwrap_or_else(|| RawValue::from_string("[]".to_string()).unwrap());
|
||||
Ok((result.get().to_string(), column_order))
|
||||
}
|
||||
|
||||
fn do_duckdb_inner(
|
||||
conn: &duckdb::Connection,
|
||||
query: &str,
|
||||
job_args: &HashMap<String, duckdb::types::Value>,
|
||||
skip_collect: bool,
|
||||
column_order: &mut Option<Vec<String>>,
|
||||
) -> Result<Box<RawValue>, String> {
|
||||
let mut rows_vec = vec![];
|
||||
|
||||
let (query, job_args) = interpolate_named_args(query, &job_args);
|
||||
|
||||
let mut stmt = conn.prepare(&query).map_err(|e| e.to_string())?;
|
||||
|
||||
let mut rows = stmt
|
||||
.query(params_from_iter(job_args))
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
if skip_collect {
|
||||
return Ok(RawValue::from_string("[]".to_string()).unwrap());
|
||||
}
|
||||
|
||||
// Statement needs to be stepped at least once or stmt.column_names() will panic
|
||||
let mut column_names = None;
|
||||
loop {
|
||||
let row = rows.next();
|
||||
match row {
|
||||
Ok(Some(row)) => {
|
||||
// Set column names if not already set
|
||||
let stmt = row.as_ref();
|
||||
let column_names = match column_names.as_ref() {
|
||||
Some(column_names) => column_names,
|
||||
None => {
|
||||
column_names = Some(stmt.column_names());
|
||||
column_names.as_ref().unwrap()
|
||||
}
|
||||
};
|
||||
|
||||
let row = row_to_value(row, &column_names.as_slice()).map_err(|e| e.to_string())?;
|
||||
rows_vec.push(row);
|
||||
}
|
||||
Ok(None) => break,
|
||||
Err(e) => {
|
||||
return Err(e.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
*column_order = column_names;
|
||||
|
||||
serde_json::value::to_raw_value(&rows_vec).map_err(|e| e.to_string())
|
||||
}
|
||||
|
||||
// duckdb-rs does not support named parameters,
|
||||
// and it raises an error when passing unused arguments. We cannot prepare batch statements
|
||||
// but only single SQL statements so it doesn't work when all arguments are not used by
|
||||
// every single statement.
|
||||
fn interpolate_named_args<'a>(
|
||||
query: &str,
|
||||
args: &'a HashMap<String, duckdb::types::Value>,
|
||||
) -> (String, Vec<&'a duckdb::types::Value>) {
|
||||
let mut query = query.to_string();
|
||||
|
||||
let mut values = vec![];
|
||||
for (arg_name, arg_value) in args {
|
||||
let pat = format!("${}", arg_name);
|
||||
if !query.contains(&pat) {
|
||||
continue;
|
||||
}
|
||||
values.push(arg_value);
|
||||
query = query.replace(&pat, &format!("${}", values.len()));
|
||||
}
|
||||
(query, values)
|
||||
}
|
||||
|
||||
fn row_to_value(row: &Row<'_>, column_names: &[String]) -> Result<Box<RawValue>, String> {
|
||||
let mut obj = serde_json::Map::new();
|
||||
for (i, key) in column_names.iter().enumerate() {
|
||||
let value: duckdb::types::Value = row.get(i).map_err(|e| e.to_string())?;
|
||||
let json_value = match value {
|
||||
duckdb::types::Value::Null => serde_json::Value::Null,
|
||||
duckdb::types::Value::Boolean(b) => serde_json::Value::Bool(b),
|
||||
duckdb::types::Value::TinyInt(i) => serde_json::Value::Number(i.into()),
|
||||
duckdb::types::Value::SmallInt(i) => serde_json::Value::Number(i.into()),
|
||||
duckdb::types::Value::Int(i) => serde_json::Value::Number(i.into()),
|
||||
duckdb::types::Value::BigInt(i) => serde_json::Value::Number(i.into()),
|
||||
duckdb::types::Value::HugeInt(i) => serde_json::Value::String(i.to_string()),
|
||||
duckdb::types::Value::UTinyInt(u) => serde_json::Value::Number(u.into()),
|
||||
duckdb::types::Value::USmallInt(u) => serde_json::Value::Number(u.into()),
|
||||
duckdb::types::Value::UInt(u) => serde_json::Value::Number(u.into()),
|
||||
duckdb::types::Value::UBigInt(u) => serde_json::Value::Number(u.into()),
|
||||
duckdb::types::Value::Float(f) => serde_json::Value::Number(
|
||||
serde_json::Number::from_f64(f as f64)
|
||||
.ok_or_else(|| ("Could not convert to f64".to_string()))?,
|
||||
),
|
||||
duckdb::types::Value::Double(f) => serde_json::Value::Number(
|
||||
serde_json::Number::from_f64(f)
|
||||
.ok_or_else(|| ("Could not convert to f64".to_string()))?,
|
||||
),
|
||||
duckdb::types::Value::Decimal(d) => serde_json::Value::String(d.to_string()),
|
||||
duckdb::types::Value::Timestamp(_, ts) => serde_json::Value::String(ts.to_string()),
|
||||
duckdb::types::Value::Text(s) => serde_json::Value::String(s),
|
||||
duckdb::types::Value::Blob(b) => serde_json::Value::Array(
|
||||
b.into_iter()
|
||||
.map(|byte| serde_json::Value::Number(byte.into()))
|
||||
.collect(),
|
||||
),
|
||||
duckdb::types::Value::Date32(d) => serde_json::Value::Number(d.into()),
|
||||
duckdb::types::Value::Time64(_, t) => serde_json::Value::String(t.to_string()),
|
||||
duckdb::types::Value::Interval { months, days, nanos } => serde_json::json!({
|
||||
"months": months,
|
||||
"days": days,
|
||||
"nanos": nanos
|
||||
}),
|
||||
duckdb::types::Value::List(values) => serde_json::Value::Array(
|
||||
values
|
||||
.into_iter()
|
||||
.map(|v| serde_json::Value::String(format!("{:?}", v)))
|
||||
.collect(),
|
||||
),
|
||||
duckdb::types::Value::Enum(e) => serde_json::Value::String(e),
|
||||
duckdb::types::Value::Struct(fields) => serde_json::Value::Object(
|
||||
fields
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), serde_json::Value::String(format!("{:?}", v))))
|
||||
.collect(),
|
||||
),
|
||||
duckdb::types::Value::Array(values) => serde_json::Value::Array(
|
||||
values
|
||||
.into_iter()
|
||||
.map(|v| serde_json::Value::String(format!("{:?}", v)))
|
||||
.collect(),
|
||||
),
|
||||
duckdb::types::Value::Map(map) => serde_json::Value::Object(
|
||||
map.iter()
|
||||
.map(|(k, v)| {
|
||||
(
|
||||
format!("{:?}", k),
|
||||
serde_json::Value::String(format!("{:?}", v)),
|
||||
)
|
||||
})
|
||||
.collect(),
|
||||
),
|
||||
duckdb::types::Value::Union(value) => {
|
||||
serde_json::Value::String(format!("{:?}", *value))
|
||||
}
|
||||
};
|
||||
obj.insert(key.clone(), json_value);
|
||||
}
|
||||
serde_json::value::to_raw_value(&obj).map_err(|e| e.to_string())
|
||||
}
|
||||
|
||||
fn json_value_to_duckdb_value(
|
||||
json_value: &serde_json::Value,
|
||||
arg_type: &str,
|
||||
) -> Result<duckdb::types::Value, String> {
|
||||
let arg_type = arg_type.to_lowercase();
|
||||
let duckdb_value = match json_value {
|
||||
serde_json::Value::Null => duckdb::types::Value::Null,
|
||||
serde_json::Value::Bool(b) => duckdb::types::Value::Boolean(*b),
|
||||
|
||||
serde_json::Value::String(s)
|
||||
if matches!(
|
||||
arg_type.as_str(),
|
||||
"timestamp" | "timestamptz" | "timestamp with time zone" | "datetime"
|
||||
) =>
|
||||
{
|
||||
string_to_duckdb_timestamp(&s)?
|
||||
}
|
||||
serde_json::Value::String(s) if arg_type.as_str() == "date" => string_to_duckdb_date(&s)?,
|
||||
serde_json::Value::String(s) if arg_type.as_str() == "time" => string_to_duckdb_time(&s)?,
|
||||
serde_json::Value::String(s) => duckdb::types::Value::Text(s.clone()),
|
||||
|
||||
serde_json::Value::Number(n) if n.is_i64() => {
|
||||
let v = n.as_i64().unwrap();
|
||||
match arg_type.as_str() {
|
||||
"tinyint" | "int1" => duckdb::types::Value::TinyInt(v as i8),
|
||||
"smallint" | "int2" | "short" => duckdb::types::Value::SmallInt(v as i16),
|
||||
"integer" | "int4" | "int" | "signed" => duckdb::types::Value::Int(v as i32),
|
||||
"bigint" | "int8" | "long" => duckdb::types::Value::BigInt(v),
|
||||
"hugeint" => duckdb::types::Value::HugeInt(v as i128),
|
||||
"float" | "float4" | "real" => duckdb::types::Value::Float(v as f32),
|
||||
"double" | "float8" => duckdb::types::Value::Double(v as f64),
|
||||
_ => duckdb::types::Value::BigInt(v), // default fallback
|
||||
}
|
||||
}
|
||||
|
||||
serde_json::Value::Number(n) if n.is_u64() => {
|
||||
let v = n.as_u64().unwrap();
|
||||
match arg_type.as_str() {
|
||||
"utinyint" => duckdb::types::Value::UTinyInt(v as u8),
|
||||
"usmallint" => duckdb::types::Value::USmallInt(v as u16),
|
||||
"uinteger" => duckdb::types::Value::UInt(v as u32),
|
||||
"ubigint" | "uhugeint" => duckdb::types::Value::UBigInt(v),
|
||||
_ => duckdb::types::Value::UBigInt(v), // default fallback
|
||||
}
|
||||
}
|
||||
|
||||
serde_json::Value::Number(n) if n.is_f64() => {
|
||||
let v = n.as_f64().unwrap();
|
||||
match arg_type.as_str() {
|
||||
"float" | "float4" | "real" => duckdb::types::Value::Float(v as f32),
|
||||
"double" | "float8" => duckdb::types::Value::Double(v),
|
||||
"decimal" | "numeric" => duckdb::types::Value::Decimal(
|
||||
Decimal::from_f64(v)
|
||||
.ok_or_else(|| ("Could not convert f64 to Decimal".to_string()))?,
|
||||
),
|
||||
_ => duckdb::types::Value::Double(v), // default fallback
|
||||
}
|
||||
}
|
||||
|
||||
serde_json::Value::Array(arr) => {
|
||||
duckdb::types::Value::Text(serde_json::to_string(arr).map_err(|e| e.to_string())?)
|
||||
}
|
||||
serde_json::Value::Object(map) => {
|
||||
duckdb::types::Value::Text(serde_json::to_string(map).map_err(|e| e.to_string())?)
|
||||
}
|
||||
|
||||
value @ _ => {
|
||||
return Err(format!(
|
||||
"Unsupported type in query: {:?} and signature {arg_type:?}",
|
||||
value
|
||||
));
|
||||
}
|
||||
};
|
||||
Ok(duckdb_value)
|
||||
}
|
||||
|
||||
fn string_to_duckdb_timestamp(s: &str) -> Result<duckdb::types::Value, String> {
|
||||
let ts =
|
||||
chrono::DateTime::parse_from_rfc3339(s).map_err(|e: chrono::ParseError| e.to_string())?;
|
||||
Ok(duckdb::types::Value::Timestamp(
|
||||
TimeUnit::Millisecond,
|
||||
ts.timestamp_millis(),
|
||||
))
|
||||
}
|
||||
|
||||
fn string_to_duckdb_date(s: &str) -> Result<duckdb::types::Value, String> {
|
||||
use chrono::Datelike;
|
||||
let date = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d")
|
||||
.map_err(|e| (format!("Invalid date format: {}", e)))?;
|
||||
Ok(duckdb::types::Value::Date32(date.num_days_from_ce()))
|
||||
}
|
||||
|
||||
fn string_to_duckdb_time(s: &str) -> Result<duckdb::types::Value, String> {
|
||||
use chrono::Timelike;
|
||||
let time = chrono::NaiveTime::parse_from_str(s, "%H:%M:%S").unwrap();
|
||||
Ok(duckdb::types::Value::Time64(
|
||||
TimeUnit::Microsecond,
|
||||
time.num_seconds_from_midnight() as i64,
|
||||
))
|
||||
}
|
||||
@@ -33,7 +33,7 @@ rust = ["dep:windmill-parser-rust"]
|
||||
nu = ["dep:windmill-parser-nu"]
|
||||
java = ["dep:windmill-parser-java"]
|
||||
ruby = ["dep:windmill-parser-ruby"]
|
||||
duckdb = ["dep:duckdb"]
|
||||
duckdb = ["dep:libloading"]
|
||||
|
||||
[dependencies]
|
||||
windmill-queue.workspace = true
|
||||
@@ -97,7 +97,6 @@ deno_permissions = { workspace = true, optional = true }
|
||||
deno_io = { workspace = true, optional = true }
|
||||
deno_error = { workspace = true, optional = true }
|
||||
async-stream.workspace = true
|
||||
duckdb = { workspace = true, optional = true }
|
||||
|
||||
postgres-native-tls.workspace = true
|
||||
native-tls.workspace = true
|
||||
@@ -125,6 +124,7 @@ winapi = { workspace = true, optional = true }
|
||||
pep440_rs.workspace = true
|
||||
process-wrap.workspace = true
|
||||
async-once-cell.workspace = true
|
||||
libloading = { workspace = true, optional = true }
|
||||
|
||||
opentelemetry = { workspace = true, optional = true }
|
||||
bollard = { workspace = true, optional = true }
|
||||
|
||||
@@ -1,19 +1,18 @@
|
||||
use std::collections::HashMap;
|
||||
use std::cell::RefCell;
|
||||
use std::env;
|
||||
use std::ffi::{c_char, CString};
|
||||
use std::ptr::NonNull;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
use duckdb::types::TimeUnit;
|
||||
use duckdb::{params_from_iter, Row};
|
||||
use rust_decimal::prelude::FromPrimitive;
|
||||
use rust_decimal::Decimal;
|
||||
use libloading::{Library, Symbol};
|
||||
use serde::Serialize;
|
||||
use serde_json::value::RawValue;
|
||||
use serde_json::{json, Value};
|
||||
use tokio::task;
|
||||
use uuid::Uuid;
|
||||
use windmill_common::error::{to_anyhow, Error, Result};
|
||||
use windmill_common::s3_helpers::S3Object;
|
||||
use windmill_common::utils::sanitize_string_from_password;
|
||||
use windmill_common::worker::{to_raw_value, Connection};
|
||||
use windmill_common::worker::Connection;
|
||||
use windmill_common::workspaces::{get_ducklake_from_db_unchecked, DucklakeCatalogResourceType};
|
||||
use windmill_parser_sql::{parse_duckdb_sig, parse_sql_blocks};
|
||||
use windmill_queue::{CanceledBy, MiniPulledJob};
|
||||
@@ -27,63 +26,6 @@ use crate::pg_executor::PgDatabase;
|
||||
use crate::sanitized_sql_params::sanitize_and_interpolate_unsafe_sql_args;
|
||||
use windmill_common::client::AuthedClient;
|
||||
|
||||
fn do_duckdb_inner(
|
||||
conn: &duckdb::Connection,
|
||||
query: &str,
|
||||
job_args: &HashMap<String, duckdb::types::Value>,
|
||||
skip_collect: bool,
|
||||
column_order: &mut Option<Vec<String>>,
|
||||
) -> Result<Box<RawValue>> {
|
||||
let mut rows_vec = vec![];
|
||||
|
||||
let (query, job_args) = interpolate_named_args(query, &job_args);
|
||||
|
||||
let mut stmt = conn
|
||||
.prepare(&query)
|
||||
.map_err(|e| Error::ExecutionErr(e.to_string()))?;
|
||||
|
||||
let mut rows = stmt
|
||||
.query(params_from_iter(job_args))
|
||||
.map_err(|e| Error::ExecutionErr(e.to_string()))?;
|
||||
|
||||
if skip_collect {
|
||||
return Ok(to_raw_value(&json!([])));
|
||||
}
|
||||
|
||||
// Statement needs to be stepped at least once or stmt.column_names() will panic
|
||||
let mut column_names = None;
|
||||
loop {
|
||||
let row = rows.next();
|
||||
match row {
|
||||
Ok(Some(row)) => {
|
||||
// Set column names if not already set
|
||||
let stmt = row.as_ref();
|
||||
let column_names = match column_names.as_ref() {
|
||||
Some(column_names) => column_names,
|
||||
None => {
|
||||
column_names = Some(stmt.column_names());
|
||||
column_names.as_ref().unwrap()
|
||||
}
|
||||
};
|
||||
|
||||
let row = row_to_value(row, &column_names.as_slice())
|
||||
.map_err(|e| Error::ExecutionErr(e.to_string()))?;
|
||||
rows_vec.push(row);
|
||||
}
|
||||
Ok(None) => break,
|
||||
Err(e) => {
|
||||
return Err(Error::ExecutionErr(e.to_string()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let (Some(column_order), Some(column_names)) = (column_order.as_mut(), column_names) {
|
||||
*column_order = column_names.clone();
|
||||
}
|
||||
|
||||
return Ok(to_raw_value(&rows_vec));
|
||||
}
|
||||
|
||||
pub async fn do_duckdb(
|
||||
job: &MiniPulledJob,
|
||||
client: &AuthedClient,
|
||||
@@ -92,7 +34,8 @@ pub async fn do_duckdb(
|
||||
mem_peak: &mut i32,
|
||||
canceled_by: &mut Option<CanceledBy>,
|
||||
worker_name: &str,
|
||||
column_order_ref: &mut Option<Vec<String>>,
|
||||
// TODO
|
||||
#[allow(unused_variables)] column_order_ref: &mut Option<Vec<String>>,
|
||||
occupancy_metrics: &mut OccupancyMetrics,
|
||||
) -> Result<Box<RawValue>> {
|
||||
let token = client.token.clone();
|
||||
@@ -109,7 +52,7 @@ pub async fn do_duckdb(
|
||||
let query = transform_s3_uris(query).await?;
|
||||
|
||||
let job_args = {
|
||||
let mut m: HashMap<String, duckdb::types::Value> = HashMap::new();
|
||||
let mut m = Vec::new();
|
||||
for sig_arg in sig.into_iter() {
|
||||
let json_value = job_args
|
||||
.remove(&sig_arg.name)
|
||||
@@ -125,17 +68,17 @@ pub async fn do_duckdb(
|
||||
s3_obj.storage.as_deref().unwrap_or("_default_"),
|
||||
s3_obj.s3
|
||||
);
|
||||
m.insert(sig_arg.name, duckdb::types::Value::Text(uri));
|
||||
m.push(Arg {
|
||||
json_value: serde_json::Value::String(uri),
|
||||
name: sig_arg.name,
|
||||
arg_type: "string".to_string(),
|
||||
});
|
||||
} else {
|
||||
let duckdb_value = json_value_to_duckdb_value(
|
||||
&json_value,
|
||||
sig_arg
|
||||
.otyp
|
||||
.clone()
|
||||
.unwrap_or_else(|| "text".to_string())
|
||||
.as_str(),
|
||||
)?;
|
||||
m.insert(sig_arg.name, duckdb_value);
|
||||
m.push(Arg {
|
||||
json_value,
|
||||
name: sig_arg.name,
|
||||
arg_type: sig_arg.otyp.unwrap_or_else(|| "text".to_string()),
|
||||
});
|
||||
}
|
||||
}
|
||||
m
|
||||
@@ -185,63 +128,15 @@ pub async fn do_duckdb(
|
||||
let base_internal_url = client.base_internal_url.clone();
|
||||
let w_id = job.workspace_id.clone();
|
||||
|
||||
// duckdb::Connection is not Send so we run the queries in a single blocking task
|
||||
let (result, column_order) = task::spawn_blocking(move || {
|
||||
let conn = duckdb::Connection::open_in_memory()
|
||||
.map_err(|e| Error::ConnectingToDatabase(e.to_string()))?;
|
||||
|
||||
let (s3_access_key, s3_secret_key) = token.split_at(token.rfind('.').unwrap_or(0));
|
||||
let s3_secret_key = &s3_secret_key[1..];
|
||||
let (s3_endpoint_ssl, s3_endpoint) = base_internal_url
|
||||
.split_once("://")
|
||||
.unwrap_or(("http", &base_internal_url));
|
||||
let s3_endpoint_ssl = match s3_endpoint_ssl {
|
||||
"https" => true,
|
||||
_ => false,
|
||||
};
|
||||
|
||||
conn.execute_batch(&format!(
|
||||
"INSTALL httpfs; LOAD httpfs;
|
||||
INSTALL azure; LOAD azure;
|
||||
CREATE OR REPLACE SECRET s3_secret (
|
||||
TYPE s3,
|
||||
PROVIDER config,
|
||||
KEY_ID '{s3_access_key}',
|
||||
SECRET '{s3_secret_key}',
|
||||
ENDPOINT '{s3_endpoint}/api/w/{w_id}/s3_proxy',
|
||||
URL_STYLE path,
|
||||
USE_SSL {s3_endpoint_ssl}
|
||||
);
|
||||
CREATE OR REPLACE SECRET gcs_secret (
|
||||
TYPE gcs,
|
||||
KEY_ID '{s3_access_key}',
|
||||
SECRET '{s3_secret_key}',
|
||||
ENDPOINT '{s3_endpoint}/api/w/{w_id}/s3_proxy',
|
||||
USE_SSL {s3_endpoint_ssl}
|
||||
);
|
||||
",
|
||||
))
|
||||
.map_err(|e| {
|
||||
Error::ExecutionErr(format!("Error setting up S3 secret: {}", e.to_string()))
|
||||
})?;
|
||||
|
||||
let mut result: Option<Box<RawValue>> = None;
|
||||
let mut column_order = None;
|
||||
|
||||
for (query_block_index, query_block) in query_block_list.iter().enumerate() {
|
||||
result = Some(
|
||||
do_duckdb_inner(
|
||||
&conn,
|
||||
query_block.as_str(),
|
||||
&job_args,
|
||||
query_block_index != query_block_list.len() - 1,
|
||||
&mut column_order,
|
||||
let (result, column_order) = tokio::task::spawn_blocking(move || {
|
||||
run_duckdb_ffi_safe(
|
||||
query_block_list.iter().map(String::as_str),
|
||||
query_block_list.len(),
|
||||
job_args,
|
||||
&token,
|
||||
&base_internal_url,
|
||||
&w_id,
|
||||
)
|
||||
.map_err(|e| Error::ExecutionErr(e.to_string()))?,
|
||||
);
|
||||
}
|
||||
let result = result.unwrap_or_else(|| to_raw_value(&json!([])));
|
||||
Ok::<_, Error>((result, column_order))
|
||||
})
|
||||
.await
|
||||
.map_err(to_anyhow)??;
|
||||
@@ -281,184 +176,132 @@ pub async fn do_duckdb(
|
||||
}
|
||||
}
|
||||
|
||||
fn row_to_value(row: &Row<'_>, column_names: &[String]) -> Result<Box<RawValue>> {
|
||||
let mut obj = serde_json::Map::new();
|
||||
for (i, key) in column_names.iter().enumerate() {
|
||||
let value: duckdb::types::Value =
|
||||
row.get(i).map_err(|e| Error::ExecutionErr(e.to_string()))?;
|
||||
let json_value = match value {
|
||||
duckdb::types::Value::Null => serde_json::Value::Null,
|
||||
duckdb::types::Value::Boolean(b) => serde_json::Value::Bool(b),
|
||||
duckdb::types::Value::TinyInt(i) => serde_json::Value::Number(i.into()),
|
||||
duckdb::types::Value::SmallInt(i) => serde_json::Value::Number(i.into()),
|
||||
duckdb::types::Value::Int(i) => serde_json::Value::Number(i.into()),
|
||||
duckdb::types::Value::BigInt(i) => serde_json::Value::Number(i.into()),
|
||||
duckdb::types::Value::HugeInt(i) => serde_json::Value::String(i.to_string()),
|
||||
duckdb::types::Value::UTinyInt(u) => serde_json::Value::Number(u.into()),
|
||||
duckdb::types::Value::USmallInt(u) => serde_json::Value::Number(u.into()),
|
||||
duckdb::types::Value::UInt(u) => serde_json::Value::Number(u.into()),
|
||||
duckdb::types::Value::UBigInt(u) => serde_json::Value::Number(u.into()),
|
||||
duckdb::types::Value::Float(f) => serde_json::Value::Number(
|
||||
serde_json::Number::from_f64(f as f64)
|
||||
.ok_or_else(|| Error::ExecutionErr("Could not convert to f64".to_string()))?,
|
||||
),
|
||||
duckdb::types::Value::Double(f) => serde_json::Value::Number(
|
||||
serde_json::Number::from_f64(f)
|
||||
.ok_or_else(|| Error::ExecutionErr("Could not convert to f64".to_string()))?,
|
||||
),
|
||||
duckdb::types::Value::Decimal(d) => serde_json::Value::String(d.to_string()),
|
||||
duckdb::types::Value::Timestamp(_, ts) => serde_json::Value::String(ts.to_string()),
|
||||
duckdb::types::Value::Text(s) => serde_json::Value::String(s),
|
||||
duckdb::types::Value::Blob(b) => serde_json::Value::Array(
|
||||
b.into_iter()
|
||||
.map(|byte| serde_json::Value::Number(byte.into()))
|
||||
.collect(),
|
||||
),
|
||||
duckdb::types::Value::Date32(d) => serde_json::Value::Number(d.into()),
|
||||
duckdb::types::Value::Time64(_, t) => serde_json::Value::String(t.to_string()),
|
||||
duckdb::types::Value::Interval { months, days, nanos } => serde_json::json!({
|
||||
"months": months,
|
||||
"days": days,
|
||||
"nanos": nanos
|
||||
}),
|
||||
duckdb::types::Value::List(values) => serde_json::Value::Array(
|
||||
values
|
||||
.into_iter()
|
||||
.map(|v| serde_json::Value::String(format!("{:?}", v)))
|
||||
.collect(),
|
||||
),
|
||||
duckdb::types::Value::Enum(e) => serde_json::Value::String(e),
|
||||
duckdb::types::Value::Struct(fields) => serde_json::Value::Object(
|
||||
fields
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), serde_json::Value::String(format!("{:?}", v))))
|
||||
.collect(),
|
||||
),
|
||||
duckdb::types::Value::Array(values) => serde_json::Value::Array(
|
||||
values
|
||||
.into_iter()
|
||||
.map(|v| serde_json::Value::String(format!("{:?}", v)))
|
||||
.collect(),
|
||||
),
|
||||
duckdb::types::Value::Map(map) => serde_json::Value::Object(
|
||||
map.iter()
|
||||
.map(|(k, v)| {
|
||||
(
|
||||
format!("{:?}", k),
|
||||
serde_json::Value::String(format!("{:?}", v)),
|
||||
)
|
||||
thread_local! {
|
||||
static DUCKDB_FFI_LIB_SINGLETON: RefCell<*const DuckDbFfiLib> = RefCell::new(std::ptr::null());
|
||||
}
|
||||
|
||||
struct DuckDbFfiLib {
|
||||
run_duckdb_ffi: Symbol<
|
||||
'static,
|
||||
unsafe extern "C" fn(
|
||||
query_block_list: *const *const c_char,
|
||||
query_block_list_count: usize,
|
||||
job_args: *const c_char,
|
||||
token: *const c_char,
|
||||
base_internal_url: *const c_char,
|
||||
w_id: *const c_char,
|
||||
column_order_ptr: *mut *mut c_char,
|
||||
) -> *mut c_char,
|
||||
>,
|
||||
}
|
||||
|
||||
impl DuckDbFfiLib {
|
||||
fn get_singleton() -> Result<&'static DuckDbFfiLib> {
|
||||
DUCKDB_FFI_LIB_SINGLETON.with(|cell| unsafe {
|
||||
let mut singleton = cell.borrow_mut();
|
||||
if singleton.is_null() {
|
||||
let lib = DuckDbFfiLib::init()?;
|
||||
let boxed_lib = Box::new(lib);
|
||||
let lib_ptr = Box::leak(boxed_lib);
|
||||
*singleton = lib_ptr as *const _;
|
||||
Ok(NonNull::new_unchecked(*singleton as *mut DuckDbFfiLib).as_ref())
|
||||
} else {
|
||||
Ok(&**singleton)
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
),
|
||||
duckdb::types::Value::Union(value) => {
|
||||
serde_json::Value::String(format!("{:?}", *value))
|
||||
}
|
||||
};
|
||||
obj.insert(key.clone(), json_value);
|
||||
}
|
||||
serde_json::value::to_raw_value(&obj).map_err(|e| e.into())
|
||||
}
|
||||
|
||||
fn json_value_to_duckdb_value(
|
||||
json_value: &serde_json::Value,
|
||||
arg_type: &str,
|
||||
) -> Result<duckdb::types::Value> {
|
||||
let arg_type = arg_type.to_lowercase();
|
||||
let duckdb_value = match json_value {
|
||||
serde_json::Value::Null => duckdb::types::Value::Null,
|
||||
serde_json::Value::Bool(b) => duckdb::types::Value::Boolean(*b),
|
||||
|
||||
serde_json::Value::String(s)
|
||||
if matches!(
|
||||
arg_type.as_str(),
|
||||
"timestamp" | "timestamptz" | "timestamp with time zone" | "datetime"
|
||||
) =>
|
||||
{
|
||||
string_to_duckdb_timestamp(&s)?
|
||||
}
|
||||
serde_json::Value::String(s) if arg_type.as_str() == "date" => string_to_duckdb_date(&s)?,
|
||||
serde_json::Value::String(s) if arg_type.as_str() == "time" => string_to_duckdb_time(&s)?,
|
||||
serde_json::Value::String(s) => duckdb::types::Value::Text(s.clone()),
|
||||
|
||||
serde_json::Value::Number(n) if n.is_i64() => {
|
||||
let v = n.as_i64().unwrap();
|
||||
match arg_type.as_str() {
|
||||
"tinyint" | "int1" => duckdb::types::Value::TinyInt(v as i8),
|
||||
"smallint" | "int2" | "short" => duckdb::types::Value::SmallInt(v as i16),
|
||||
"integer" | "int4" | "int" | "signed" => duckdb::types::Value::Int(v as i32),
|
||||
"bigint" | "int8" | "long" => duckdb::types::Value::BigInt(v),
|
||||
"hugeint" => duckdb::types::Value::HugeInt(v as i128),
|
||||
"float" | "float4" | "real" => duckdb::types::Value::Float(v as f32),
|
||||
"double" | "float8" => duckdb::types::Value::Double(v as f64),
|
||||
_ => duckdb::types::Value::BigInt(v), // default fallback
|
||||
}
|
||||
}
|
||||
|
||||
serde_json::Value::Number(n) if n.is_u64() => {
|
||||
let v = n.as_u64().unwrap();
|
||||
match arg_type.as_str() {
|
||||
"utinyint" => duckdb::types::Value::UTinyInt(v as u8),
|
||||
"usmallint" => duckdb::types::Value::USmallInt(v as u16),
|
||||
"uinteger" => duckdb::types::Value::UInt(v as u32),
|
||||
"ubigint" | "uhugeint" => duckdb::types::Value::UBigInt(v),
|
||||
_ => duckdb::types::Value::UBigInt(v), // default fallback
|
||||
}
|
||||
}
|
||||
|
||||
serde_json::Value::Number(n) if n.is_f64() => {
|
||||
let v = n.as_f64().unwrap();
|
||||
match arg_type.as_str() {
|
||||
"float" | "float4" | "real" => duckdb::types::Value::Float(v as f32),
|
||||
"double" | "float8" => duckdb::types::Value::Double(v),
|
||||
"decimal" | "numeric" => {
|
||||
duckdb::types::Value::Decimal(Decimal::from_f64(v).ok_or_else(|| {
|
||||
Error::ExecutionErr("Could not convert f64 to Decimal".to_string())
|
||||
})?)
|
||||
}
|
||||
_ => duckdb::types::Value::Double(v), // default fallback
|
||||
}
|
||||
}
|
||||
|
||||
serde_json::Value::Array(arr) => {
|
||||
duckdb::types::Value::Text(serde_json::to_string(arr).map_err(to_anyhow)?)
|
||||
}
|
||||
serde_json::Value::Object(map) => {
|
||||
duckdb::types::Value::Text(serde_json::to_string(map).map_err(to_anyhow)?)
|
||||
}
|
||||
|
||||
value @ _ => {
|
||||
return Err(Error::ExecutionErr(format!(
|
||||
"Unsupported type in query: {:?} and signature {arg_type:?}",
|
||||
value
|
||||
)))
|
||||
}
|
||||
};
|
||||
Ok(duckdb_value)
|
||||
}
|
||||
|
||||
fn string_to_duckdb_timestamp(s: &str) -> Result<duckdb::types::Value> {
|
||||
let ts = chrono::DateTime::parse_from_rfc3339(s)
|
||||
.map_err(|e: chrono::ParseError| Error::ExecutionErr(e.to_string()))?;
|
||||
Ok(duckdb::types::Value::Timestamp(
|
||||
TimeUnit::Millisecond,
|
||||
ts.timestamp_millis(),
|
||||
fn init() -> Result<Self> {
|
||||
let lib = unsafe {
|
||||
Library::new(if cfg!(target_os = "macos") {
|
||||
"libwindmill_duckdb_ffi_internal.dylib"
|
||||
} else if cfg!(target_os = "windows") {
|
||||
"windmill_duckdb_ffi_internal.dll"
|
||||
} else {
|
||||
"libwindmill_duckdb_ffi_internal.so"
|
||||
})
|
||||
.map_err(|e| {
|
||||
Error::InternalErr(format!(
|
||||
"Could not init duckdb. Make sure you have the latest windmill_duckdb_ffi_lib.{} alongside your binary : https://github.com/windmill-labs/windmill/releases \n{}",
|
||||
if cfg!(target_os = "macos") { "dylib" }
|
||||
else if cfg!(target_os = "windows") { "dll" }
|
||||
else { "so" },
|
||||
e.to_string()
|
||||
))
|
||||
})?
|
||||
};
|
||||
let lib = Box::leak(Box::new(lib));
|
||||
Ok(DuckDbFfiLib {
|
||||
run_duckdb_ffi: unsafe { lib.get(b"run_duckdb_ffi").map_err(to_anyhow)? },
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn string_to_duckdb_date(s: &str) -> Result<duckdb::types::Value> {
|
||||
use chrono::Datelike;
|
||||
let date = chrono::NaiveDate::parse_from_str(s, "%Y-%m-%d")
|
||||
.map_err(|e| Error::ExecutionErr(format!("Invalid date format: {}", e)))?;
|
||||
Ok(duckdb::types::Value::Date32(date.num_days_from_ce()))
|
||||
}
|
||||
// Read backend/windmill-duckdb-ffi-internal/README_DEV.md for details about why we use FFI
|
||||
fn run_duckdb_ffi_safe<'a>(
|
||||
query_block_list: impl Iterator<Item = &'a str>,
|
||||
query_block_list_count: usize,
|
||||
job_args: Vec<Arg>,
|
||||
token: &str,
|
||||
base_internal_url: &str,
|
||||
w_id: &str,
|
||||
) -> Result<(Box<RawValue>, Option<Vec<String>>)> {
|
||||
let query_block_list = query_block_list
|
||||
.map(|s| {
|
||||
CString::new(s).map_err(|e| {
|
||||
Error::ExecutionErr(format!("Failed CString conversion: {}", e.to_string()))
|
||||
})
|
||||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
let query_block_list = query_block_list
|
||||
.iter()
|
||||
.map(|s| s.as_ptr())
|
||||
.collect::<Vec<_>>();
|
||||
let job_args = serde_json::to_string(&job_args).map_err(to_anyhow)?;
|
||||
|
||||
fn string_to_duckdb_time(s: &str) -> Result<duckdb::types::Value> {
|
||||
use chrono::Timelike;
|
||||
let time = chrono::NaiveTime::parse_from_str(s, "%H:%M:%S").unwrap();
|
||||
Ok(duckdb::types::Value::Time64(
|
||||
TimeUnit::Microsecond,
|
||||
time.num_seconds_from_midnight() as i64,
|
||||
let job_args = CString::new(job_args).map_err(to_anyhow)?;
|
||||
let token = CString::new(token).map_err(to_anyhow)?;
|
||||
let base_internal_url = CString::new(base_internal_url).map_err(to_anyhow)?;
|
||||
let w_id = CString::new(w_id).map_err(to_anyhow)?;
|
||||
|
||||
let run_duckdb_ffi = &DuckDbFfiLib::get_singleton()?.run_duckdb_ffi;
|
||||
let mut column_order: *mut c_char = std::ptr::null_mut();
|
||||
let result_cstr = unsafe {
|
||||
let ptr = run_duckdb_ffi(
|
||||
query_block_list.as_ptr(),
|
||||
query_block_list_count,
|
||||
job_args.as_ptr(),
|
||||
token.as_ptr(),
|
||||
base_internal_url.as_ptr(),
|
||||
w_id.as_ptr(),
|
||||
&mut column_order,
|
||||
);
|
||||
CString::from_raw(ptr) // Using from_raw to take ownership and ensure it gets freed
|
||||
};
|
||||
|
||||
let column_order = if column_order.is_null() {
|
||||
None
|
||||
} else {
|
||||
Some(unsafe {
|
||||
serde_json::from_str::<Vec<String>>(&CString::from_raw(column_order).to_string_lossy())?
|
||||
})
|
||||
};
|
||||
|
||||
let result_str = result_cstr
|
||||
.to_str()
|
||||
.map_err(|e| {
|
||||
Error::ExecutionErr(format!(
|
||||
"Failed to convert result C string to Rust string: {}",
|
||||
e.to_string()
|
||||
))
|
||||
})?
|
||||
.to_string();
|
||||
|
||||
if result_str.starts_with("ERROR") {
|
||||
Err(Error::ExecutionErr(result_str[6..].to_string()))
|
||||
} else {
|
||||
let result = serde_json::value::RawValue::from_string(result_str).map_err(to_anyhow)?;
|
||||
Ok((result, column_order))
|
||||
}
|
||||
}
|
||||
|
||||
struct ParsedAttachDbResource<'a> {
|
||||
@@ -699,26 +542,12 @@ impl Drop for UseBigQueryCredentialsFile {
|
||||
}
|
||||
}
|
||||
|
||||
// duckdb-rs does not support named parameters,
|
||||
// and it raises an error when passing unused arguments. We cannot prepare batch statements
|
||||
// but only single SQL statements so it doesn't work when all arguments are not used by
|
||||
// every single statement.
|
||||
fn interpolate_named_args<'a>(
|
||||
query: &str,
|
||||
args: &'a HashMap<String, duckdb::types::Value>,
|
||||
) -> (String, Vec<&'a duckdb::types::Value>) {
|
||||
let mut query = query.to_string();
|
||||
|
||||
let mut values = vec![];
|
||||
for (arg_name, arg_value) in args {
|
||||
let pat = format!("${}", arg_name);
|
||||
if !query.contains(&pat) {
|
||||
continue;
|
||||
}
|
||||
values.push(arg_value);
|
||||
query = query.replace(&pat, &format!("${}", values.len()));
|
||||
}
|
||||
(query, values)
|
||||
// Shared with ffi module
|
||||
#[derive(Serialize, Clone, Debug, PartialEq, Default)]
|
||||
pub struct Arg {
|
||||
pub name: String,
|
||||
pub arg_type: String,
|
||||
pub json_value: serde_json::Value,
|
||||
}
|
||||
|
||||
// input should contain a single statement. remove all comments before and after it
|
||||
|
||||
@@ -175,7 +175,7 @@
|
||||
'csharp',
|
||||
'nu',
|
||||
'java',
|
||||
'ruby',
|
||||
'ruby'
|
||||
// for related places search: ADD_NEW_LANG
|
||||
].includes(lang ?? '')
|
||||
)
|
||||
@@ -715,10 +715,7 @@ JsonNode ${windmillPathToCamelCaseName(path)} = JsonNode.Parse(await client.GetS
|
||||
on:selectAndClose={(s3obj) => {
|
||||
let s = `'${formatS3Object(s3obj.detail)}'`
|
||||
if (lang === 'duckdb') {
|
||||
if (s3obj.detail?.s3.endsWith('.json')) s = `read_json(${s})`
|
||||
if (s3obj.detail?.s3.endsWith('.csv')) s = `read_csv(${s})`
|
||||
if (s3obj.detail?.s3.endsWith('.parquet')) s = `read_parquet(${s})`
|
||||
editor?.insertAtCursor(s)
|
||||
editor?.insertAtCursor(`SELECT * FROM ${s}`)
|
||||
} else if (lang === 'python3') {
|
||||
if (!editor?.getCode().includes('import wmill')) {
|
||||
editor?.insertAtBeginning('import wmill\n')
|
||||
|
||||
Reference in New Issue
Block a user