feat: track datatable table DDL changes in workspace_diff
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -5044,6 +5044,14 @@ async fn compare_workspaces(
|
||||
compare_two_folders(&db, &source_workspace_id, &fork_workspace_id, &item.path)
|
||||
.await?,
|
||||
),
|
||||
"datatable_table" => {
|
||||
// No stateless comparison yet — assume changes are real
|
||||
Some(ItemComparison {
|
||||
has_changes: true,
|
||||
exists_in_source: true,
|
||||
exists_in_fork: true,
|
||||
})
|
||||
}
|
||||
k => {
|
||||
tracing::error!("Received unrecognized item kind `{k}` with path: `{}` while computing diff of {fork_workspace_id} and {source_workspace_id} workspaces. Skipping this item", item.path);
|
||||
None
|
||||
@@ -5284,6 +5292,8 @@ async fn query_visible_items<'c>(
|
||||
.fetch_all(&mut **tx)
|
||||
.await?
|
||||
}
|
||||
// Datatable tables are always visible (no permission gating)
|
||||
"datatable_table" => paths_vec,
|
||||
_ => vec![], // Unknown kind
|
||||
};
|
||||
|
||||
|
||||
@@ -24351,7 +24351,7 @@ components:
|
||||
kind:
|
||||
type: string
|
||||
enum:
|
||||
["script", "flow", "app", "raw_app", "resource", "variable", "resource_type"]
|
||||
["script", "flow", "app", "raw_app", "resource", "variable", "resource_type", "datatable_table"]
|
||||
description: Type of the item
|
||||
path:
|
||||
type: string
|
||||
|
||||
@@ -230,6 +230,12 @@ fn wrap_ducklake_query(query: &str, ducklake: &str) -> String {
|
||||
format!("{}{}{}", &query[..insert_pos], attach, &query[insert_pos..])
|
||||
}
|
||||
|
||||
/// Describes a DDL operation that modifies a table structure.
|
||||
#[derive(Debug, Clone, PartialEq)]
|
||||
pub struct DdlOperation {
|
||||
pub table_name: String,
|
||||
}
|
||||
|
||||
/// Result of expanding a WM_INTERNAL_DB marker.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub struct ExpandedQuery {
|
||||
@@ -237,6 +243,8 @@ pub struct ExpandedQuery {
|
||||
/// If set, the worker should use this language instead of the original.
|
||||
/// Used when the expanded code is in a different language (e.g. BigQuery all-tables uses Bun).
|
||||
pub language_override: Option<ScriptLang>,
|
||||
/// If set, this expansion was a DDL operation (CREATE/ALTER/DROP TABLE).
|
||||
pub ddl_operation: Option<DdlOperation>,
|
||||
}
|
||||
|
||||
/// Checks if a SQL script is a WM_INTERNAL_DB marker and expands it into real SQL.
|
||||
@@ -279,9 +287,18 @@ pub fn try_expand_internal_db_query(
|
||||
"INSERT" => expand_insert(json_str, db_type).map(ExpandedQuery::sql),
|
||||
"UPDATE" => expand_update(json_str, db_type).map(ExpandedQuery::sql),
|
||||
// Schema DDL operations
|
||||
"DROP_TABLE" => expand_drop_table(json_str, db_type).map(ExpandedQuery::sql),
|
||||
"CREATE_TABLE" => expand_create_table(json_str, db_type).map(ExpandedQuery::sql),
|
||||
"ALTER_TABLE" => expand_alter_table(json_str, db_type).map(ExpandedQuery::sql),
|
||||
"DROP_TABLE" => {
|
||||
let table_name = extract_ddl_table_name(json_str, "table");
|
||||
expand_drop_table(json_str, db_type).map(|code| ExpandedQuery::ddl(code, table_name))
|
||||
}
|
||||
"CREATE_TABLE" => {
|
||||
let table_name = extract_ddl_table_name(json_str, "name");
|
||||
expand_create_table(json_str, db_type).map(|code| ExpandedQuery::ddl(code, table_name))
|
||||
}
|
||||
"ALTER_TABLE" => {
|
||||
let table_name = extract_ddl_table_name(json_str, "name");
|
||||
expand_alter_table(json_str, db_type).map(|code| ExpandedQuery::ddl(code, table_name))
|
||||
}
|
||||
"CREATE_SCHEMA" => expand_create_schema(json_str, db_type).map(ExpandedQuery::sql),
|
||||
"DROP_SCHEMA" => expand_drop_schema(json_str, db_type).map(ExpandedQuery::sql),
|
||||
// Metadata queries
|
||||
@@ -297,13 +314,26 @@ pub fn try_expand_internal_db_query(
|
||||
Some(result)
|
||||
}
|
||||
|
||||
/// Extract the table name from a DDL JSON payload. The key varies by operation:
|
||||
/// DROP_TABLE uses "table", CREATE_TABLE and ALTER_TABLE use "name".
|
||||
fn extract_ddl_table_name(json_str: &str, key: &str) -> String {
|
||||
serde_json::from_str::<serde_json::Value>(json_str)
|
||||
.ok()
|
||||
.and_then(|v| v.get(key).and_then(|v| v.as_str()).map(|s| s.to_string()))
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
impl ExpandedQuery {
|
||||
fn sql(code: String) -> Self {
|
||||
Self { code, language_override: None }
|
||||
Self { code, language_override: None, ddl_operation: None }
|
||||
}
|
||||
|
||||
fn ddl(code: String, table_name: String) -> Self {
|
||||
Self { code, language_override: None, ddl_operation: Some(DdlOperation { table_name }) }
|
||||
}
|
||||
|
||||
fn with_language(code: String, lang: ScriptLang) -> Self {
|
||||
Self { code, language_override: Some(lang) }
|
||||
Self { code, language_override: Some(lang), ddl_operation: None }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -43,6 +43,7 @@ pub enum DeployedObject {
|
||||
Settings { setting_type: String },
|
||||
Key { key_type: String },
|
||||
WorkspaceDependencies { path: String },
|
||||
DatatableTable { datatable_name: String, table_name: String },
|
||||
}
|
||||
|
||||
impl DeployedObject {
|
||||
@@ -71,6 +72,9 @@ impl DeployedObject {
|
||||
DeployedObject::Settings { .. } => "settings.yaml".to_string(),
|
||||
DeployedObject::Key { .. } => "encryption_key.yaml".to_string(),
|
||||
DeployedObject::WorkspaceDependencies { path, .. } => path.to_owned(),
|
||||
DeployedObject::DatatableTable { datatable_name, table_name } => {
|
||||
format!("{datatable_name}/{table_name}")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,7 +85,8 @@ impl DeployedObject {
|
||||
| Self::ResourceType { .. }
|
||||
| Self::Settings { .. }
|
||||
| Self::Key { .. }
|
||||
| Self::WorkspaceDependencies { .. } => true,
|
||||
| Self::WorkspaceDependencies { .. }
|
||||
| Self::DatatableTable { .. } => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
@@ -111,6 +116,7 @@ impl DeployedObject {
|
||||
DeployedObject::Settings { .. } => None,
|
||||
DeployedObject::Key { .. } => None,
|
||||
DeployedObject::WorkspaceDependencies { .. } => None,
|
||||
DeployedObject::DatatableTable { .. } => None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -139,6 +145,7 @@ impl DeployedObject {
|
||||
DeployedObject::Settings { .. } => "settings",
|
||||
DeployedObject::Key { .. } => "key",
|
||||
DeployedObject::WorkspaceDependencies { .. } => "workspace_dependencies",
|
||||
DeployedObject::DatatableTable { .. } => "datatable_table",
|
||||
}
|
||||
.to_string()
|
||||
}
|
||||
@@ -272,7 +279,10 @@ mod tests {
|
||||
path: "f/folder/script".to_string(),
|
||||
parent_path: Some("f/folder/old_script".to_string()),
|
||||
};
|
||||
assert_eq!(obj.get_parent_path(), Some("f/folder/old_script".to_string()));
|
||||
assert_eq!(
|
||||
obj.get_parent_path(),
|
||||
Some("f/folder/old_script".to_string())
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -313,21 +323,13 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn test_get_kind_flow() {
|
||||
let obj = DeployedObject::Flow {
|
||||
path: "test".to_string(),
|
||||
parent_path: None,
|
||||
version: 1,
|
||||
};
|
||||
let obj = DeployedObject::Flow { path: "test".to_string(), parent_path: None, version: 1 };
|
||||
assert_eq!(obj.get_kind(), "flow");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_kind_app() {
|
||||
let obj = DeployedObject::App {
|
||||
path: "test".to_string(),
|
||||
version: 1,
|
||||
parent_path: None,
|
||||
};
|
||||
let obj = DeployedObject::App { path: "test".to_string(), version: 1, parent_path: None };
|
||||
assert_eq!(obj.get_kind(), "app");
|
||||
}
|
||||
|
||||
@@ -346,7 +348,8 @@ mod tests {
|
||||
"http_trigger"
|
||||
);
|
||||
assert_eq!(
|
||||
DeployedObject::WebsocketTrigger { path: "t".to_string(), parent_path: None }.get_kind(),
|
||||
DeployedObject::WebsocketTrigger { path: "t".to_string(), parent_path: None }
|
||||
.get_kind(),
|
||||
"websocket_trigger"
|
||||
);
|
||||
assert_eq!(
|
||||
|
||||
@@ -4212,12 +4212,14 @@ pub async fn run_language_executor(
|
||||
// Expand WM_INTERNAL_DB markers into real SQL before dispatching
|
||||
let expanded_code: String;
|
||||
let mut language = language;
|
||||
let mut ddl_operation: Option<windmill_common::query_builders::DdlOperation> = None;
|
||||
let code = if let Some(ref lang) = language {
|
||||
match windmill_common::query_builders::try_expand_internal_db_query(code, lang) {
|
||||
Some(Ok(expanded)) => {
|
||||
if let Some(lang_override) = expanded.language_override {
|
||||
language = Some(lang_override);
|
||||
}
|
||||
ddl_operation = expanded.ddl_operation;
|
||||
expanded_code = expanded.code;
|
||||
&expanded_code
|
||||
}
|
||||
@@ -4232,6 +4234,40 @@ pub async fn run_language_executor(
|
||||
} else {
|
||||
code
|
||||
};
|
||||
|
||||
// Tally datatable table DDL change for workspace diff tracking
|
||||
if let Some(ref ddl_op) = ddl_operation {
|
||||
if let Connection::Sql(ref db) = conn {
|
||||
// Extract datatable name from the "database" arg (e.g. "datatable://main")
|
||||
let datatable_name = job
|
||||
.args
|
||||
.as_ref()
|
||||
.and_then(|args| args.get("database"))
|
||||
.and_then(|v| serde_json::from_str::<String>(v.get()).ok())
|
||||
.and_then(|s| s.strip_prefix("datatable://").map(|n| n.to_string()));
|
||||
|
||||
if let Some(dt_name) = datatable_name {
|
||||
if let Err(e) = windmill_git_sync::handle_deployment_metadata(
|
||||
&job.permissioned_as_email,
|
||||
&job.created_by,
|
||||
db,
|
||||
&job.workspace_id,
|
||||
windmill_git_sync::DeployedObject::DatatableTable {
|
||||
datatable_name: dt_name,
|
||||
table_name: ddl_op.table_name.clone(),
|
||||
},
|
||||
None,
|
||||
true,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
{
|
||||
tracing::error!(%e, "error handling datatable DDL deployment metadata");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(modules) = modules {
|
||||
#[cfg(feature = "python")]
|
||||
let base_dir = if language == Some(ScriptLang::Python3) {
|
||||
|
||||
Reference in New Issue
Block a user