oracle support for s3 streaming + misc (#5738)

* changed SQL templates to include S3

* oracle support for s3 streaming
This commit is contained in:
Diego Imbert
2025-05-14 15:45:00 +02:00
committed by GitHub
parent 62221d81ae
commit 669a95c3be
4 changed files with 106 additions and 48 deletions

1
backend/Cargo.lock generated
View File

@@ -15027,6 +15027,7 @@ dependencies = [
"tiberius",
"tokio",
"tokio-postgres 0.7.13",
"tokio-stream",
"tokio-util",
"tracing",
"url",

View File

@@ -57,6 +57,7 @@ sqlx.workspace = true
uuid.workspace = true
tracing.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
serde.workspace = true
serde_json.workspace = true
futures.workspace = true

View File

@@ -1,26 +1,31 @@
use anyhow::anyhow;
use chrono::Utc;
use std::{collections::HashMap, str::FromStr, sync::Arc};
use std::{collections::HashMap, str::FromStr, sync::Arc, vec};
use windmill_parser::Arg;
use futures::{future::BoxFuture, FutureExt};
use futures::{future::BoxFuture, FutureExt, StreamExt};
use itertools::Itertools;
use oracle::sql_type::{InnerValue, OracleType, ToSql};
use serde::{Deserialize, Serialize};
use serde_json::{json, value::RawValue, Value};
use windmill_common::{
error::{to_anyhow, Error},
s3_helpers::convert_json_line_stream,
worker::{to_raw_value, Connection},
};
use windmill_queue::MiniPulledJob;
use windmill_parser_sql::{
parse_db_resource, parse_oracledb_sig, parse_sql_blocks, parse_sql_statement_named_params,
parse_db_resource, parse_oracledb_sig, parse_s3_mode, parse_sql_blocks,
parse_sql_statement_named_params,
};
use windmill_queue::CanceledBy;
use crate::{
common::{build_args_values, check_executor_binary_exists, OccupancyMetrics},
common::{
build_args_values, check_executor_binary_exists, s3_mode_args_to_worker_data,
OccupancyMetrics, S3ModeWorkerData,
},
handle_child::run_future_with_polling_update_job_poller,
sanitized_sql_params::sanitize_and_interpolate_unsafe_sql_args,
AuthedClient,
@@ -43,6 +48,7 @@ pub fn do_oracledb_inner<'a>(
conn: Arc<std::sync::Mutex<oracle::Connection>>,
column_order: Option<&'a mut Option<Vec<String>>>,
skip_collect: bool,
s3: Option<S3ModeWorkerData>,
) -> windmill_common::error::Result<BoxFuture<'a, windmill_common::error::Result<Box<RawValue>>>> {
let qw = query.trim_end_matches(';').to_string();
@@ -81,55 +87,90 @@ pub fn do_oracledb_inner<'a>(
Ok(to_raw_value(&Value::Array(vec![])))
} else {
let rows = tokio::task::spawn_blocking(move || {
let params2: Vec<(&str, &dyn ToSql)> = params
.iter()
.filter(|(k, _)| param_names.contains(&k.clone().into_bytes()))
.map(|(key, val)| (key.as_str(), &**val as &dyn ToSql))
.collect();
// We use an mpsc because we need an async stream for s3 mode. However since everything is sync
// in rust-oracle, I assumed that calling ResultSet::next() is blocking when it has to refetch.
let (tx, rx) = tokio::sync::mpsc::channel::<oracle::Result<Value>>(1000);
let (column_order_oneshot_tx, column_order_oneshot_rx) =
tokio::sync::oneshot::channel::<Option<Vec<String>>>();
let mut column_order_oneshot_tx = Some(column_order_oneshot_tx);
let rows_stream = tokio_stream::wrappers::ReceiverStream::new(rx);
tokio::task::spawn_blocking(move || {
let result = (|| {
let tx = tx.clone();
let params2: Vec<(&str, &dyn ToSql)> = params
.iter()
.filter(|(k, _)| param_names.contains(&k.clone().into_bytes()))
.map(|(key, val)| (key.as_str(), &**val as &dyn ToSql))
.collect();
let c = conn.lock()?;
let mut stmt = c.statement(&qw).build()?;
let c = conn.lock()?;
let mut stmt = c.statement(&qw).build()?;
let rows = match stmt.statement_type() {
oracle::StatementType::Select => {
let result_rows = stmt.query_named(&params2)?;
let rows: Vec<oracle::Row> =
result_rows.into_iter().filter_map(Result::ok).collect_vec();
rows
}
_ => {
stmt.execute_named(&params2)?;
c.commit()?;
vec![]
}
};
match stmt.statement_type() {
oracle::StatementType::Select => {
let mut result_rows = stmt.query_named(&params2)?.enumerate();
while let Some((i, row)) = result_rows.next() {
match row {
Ok(row) => {
// If first row, infer column order and send it to the channel
if i == 0 {
let col_order: Vec<String> = row
.column_info()
.iter()
.map(|x| x.name().to_string())
.collect::<Vec<String>>();
let _ = column_order_oneshot_tx
.take()
.unwrap()
.send(Some(col_order));
}
oracle::Result::Ok(rows)
})
.await
.map_err(to_anyhow)?
.map_err(to_anyhow)?;
// called in a spawn_blocking synchronous context, unwrap won't panic
tx.blocking_send(Ok(convert_row_to_value(row))).unwrap()
}
Err(e) => {
tx.blocking_send(Err(e)).unwrap();
break;
}
}
}
}
_ => {
stmt.execute_named(&params2)?;
c.commit()?;
}
};
drop(column_order_oneshot_tx);
Ok::<_, oracle::Error>(())
})();
match result {
Ok(_) => {}
Err(e) => tx.blocking_send(Err(e)).unwrap(),
}
// all instances of tx should be dropped here
});
if let Some(column_order) = column_order {
*column_order = Some(
rows.first()
.map(|x| {
x.column_info()
.iter()
.map(|x| x.name().to_string())
.collect::<Vec<String>>()
})
.unwrap_or_default(),
);
if let Ok(Some(col_order)) = column_order_oneshot_rx.await {
if let Some(column_order) = column_order {
*column_order = Some(col_order);
}
}
Ok(to_raw_value(
&rows
.into_iter()
.map(|x| convert_row_to_value(x))
.collect::<Vec<serde_json::Value>>(),
))
if let Some(s3) = s3 {
let stream = convert_json_line_stream(rows_stream.boxed(), s3.format).await?;
s3.upload(stream.boxed()).await?;
return Ok(serde_json::value::to_raw_value(&s3.object_key)?);
} else {
let rows: Vec<_> = rows_stream.collect().await;
Ok(to_raw_value(
&rows
.into_iter()
.collect::<Result<Vec<_>, _>>()
.map_err(to_anyhow)?
.into_iter()
.collect::<Vec<serde_json::Value>>(),
))
}
}
};
@@ -312,6 +353,7 @@ pub async fn do_oracledb(
let job_args = build_args_values(job, client, conn).await?;
let inline_db_res_path = parse_db_resource(&query);
let s3 = parse_s3_mode(&query)?.map(|s3| s3_mode_args_to_worker_data(s3, client.clone(), job));
let db_arg = if let Some(inline_db_res_path) = inline_db_res_path {
Some(
@@ -376,6 +418,7 @@ pub async fn do_oracledb(
conn_a.clone(),
None,
annotations.return_last_result && i < queries.len() - 1,
s3.clone(),
)?
.await?;
res.push(r);
@@ -390,7 +433,14 @@ pub async fn do_oracledb(
f.boxed()
} else {
do_oracledb_inner(&query, statement_values, conn_a, Some(column_order), false)?
do_oracledb_inner(
&query,
statement_values,
conn_a,
Some(column_order),
false,
s3,
)?
};
let result = run_future_with_polling_update_job_poller(

View File

@@ -249,6 +249,7 @@ export async function main(message: string, name: string, step_id: string) {
`
const POSTGRES_INIT_CODE = `-- to pin the database use '-- database f/your/path'
-- to stream a large query result to your workspace storage use '-- s3'
-- to only return the result of the last query use '--return_last_result'
-- $1 name1 = default arg
-- $2 name2
@@ -259,6 +260,7 @@ UPDATE demo SET col2 = \$4::INT WHERE col2 = \$2::INT;
`
const MYSQL_INIT_CODE = `-- to pin the database use '-- database f/your/path'
-- to stream a large query result to your workspace storage use '-- s3'
-- :name1 (text) = default arg
-- :name2 (int)
-- :name3 (int)
@@ -267,6 +269,7 @@ UPDATE demo SET col2 = :name3 WHERE col2 = :name2;
`
const BIGQUERY_INIT_CODE = `-- to pin the database use '-- database f/your/path'
-- to stream a large query result to your workspace storage use '-- s3'
-- @name1 (string) = default arg
-- @name2 (integer)
-- @name3 (string[])
@@ -276,6 +279,7 @@ UPDATE \`demodb.demo\` SET col2 = @name4 WHERE col2 = @name2;
`
const ORACLEDB_INIT_CODE = `-- to pin the database use '-- database f/your/path'
-- to stream a large query result to your workspace storage use '-- s3'
-- :name1 (text) = default arg
-- :name2 (int)
-- :name3 (int)
@@ -284,6 +288,7 @@ UPDATE demo SET col2 = :name3 WHERE col2 = :name2;
`
const SNOWFLAKE_INIT_CODE = `-- to pin the database use '-- database f/your/path'
-- to stream a large query result to your workspace storage use '-- s3'
-- ? name1 (varchar) = default arg
-- ? name2 (int)
INSERT INTO demo VALUES (?, ?);
@@ -294,6 +299,7 @@ UPDATE demo SET col2 = ? WHERE col2 = ?;
const MSSQL_INIT_CODE = `-- return_last_result
-- to pin the database use '-- database f/your/path'
-- to stream a large query result to your workspace storage use '-- s3'
-- @P1 name1 (varchar) = default arg
-- @P2 name2 (int)
-- @P3 name3 (int)