feat: add PDF input support to AI agent (#8525)

* feat: add PDF input support to AI agent with user_attachments field

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* test: add integration tests for PDF input and backward compat

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* feat: add ContentPart::File variant for PDF support across all providers

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* refactor: address review feedback on PDF support

- Extract parse_data_url_bytes and mime_to_document_format helpers in Bedrock
- Add is_document_mime helper in ai_types for centralized MIME routing
- Extract s3_object_to_content_part helper to deduplicate image_handler/openai
- Rename AnthropicImageSource to AnthropicBase64Source
- Derive Bedrock DocumentFormat from MIME type instead of hardcoding Pdf

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: merge user message and attachments into single message for Bedrock

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
centdix
2026-03-26 14:55:10 +01:00
committed by GitHub
parent d7f4b950ce
commit e44504c6e9
22 changed files with 349 additions and 87 deletions

View File

@@ -11,8 +11,9 @@ use aws_config::BehaviorVersion;
use aws_credential_types::provider::token::ProvideToken;
use aws_credential_types::provider::ProvideCredentials;
use aws_sdk_bedrockruntime::types::{
ContentBlock, ConversationRole, ConverseStreamOutput, ImageBlock, ImageFormat, ImageSource,
InferenceConfiguration, Message, SystemContentBlock, Tool, ToolInputSchema, ToolSpecification,
ContentBlock, ConversationRole, ConverseStreamOutput, DocumentBlock, DocumentFormat,
DocumentSource, ImageBlock, ImageFormat, ImageSource, InferenceConfiguration, Message,
SystemContentBlock, Tool, ToolInputSchema, ToolSpecification,
};
use aws_sdk_bedrockruntime::Client as BedrockRuntimeClient;
use serde::{Deserialize, Serialize};
@@ -356,13 +357,12 @@ pub fn content_to_text(content: &OpenAIContent) -> String {
}
}
/// Parse image data URL and extract format and base64 data
fn parse_image_data_url(url: &str) -> Result<(ImageFormat, Vec<u8>), Error> {
/// Parse a data URL and extract MIME type and decoded bytes.
fn parse_data_url_bytes(url: &str) -> Result<(String, Vec<u8>), Error> {
if !url.starts_with("data:") {
return Err(Error::internal_err("Image URL must be a data URL"));
return Err(Error::internal_err("URL must be a data URL"));
}
// Parse data:image/png;base64,<data>
let base64_start = url
.find("base64,")
.ok_or_else(|| Error::internal_err("Invalid data URL format"))?;
@@ -372,30 +372,51 @@ fn parse_image_data_url(url: &str) -> Result<(ImageFormat, Vec<u8>), Error> {
.split(';')
.next()
.and_then(|s| s.strip_prefix("data:"))
.unwrap_or("image/png");
.unwrap_or("application/octet-stream");
let bytes = base64::Engine::decode(&base64::engine::general_purpose::STANDARD, base64_data)
.map_err(|e| Error::internal_err(format!("Failed to decode base64 data: {}", e)))?;
Ok((mime_type.to_string(), bytes))
}
/// Parse an image data URL and extract ImageFormat and decoded bytes.
fn parse_image_data_url(url: &str) -> Result<(ImageFormat, Vec<u8>), Error> {
let (mime_type, bytes) = parse_data_url_bytes(url)?;
// Extract format from MIME type (e.g., "image/png" -> "png")
let format_str = mime_type
.rsplit_once('/')
.map(|(_, format)| format)
.unwrap_or("png");
// Map to ImageFormat enum
let format = match format_str {
"png" => ImageFormat::Png,
"jpeg" | "jpg" => ImageFormat::Jpeg,
"gif" => ImageFormat::Gif,
"webp" => ImageFormat::Webp,
_ => ImageFormat::Png, // Default to PNG
_ => ImageFormat::Png,
};
// Decode base64
let bytes = base64::Engine::decode(&base64::engine::general_purpose::STANDARD, base64_data)
.map_err(|e| Error::internal_err(format!("Failed to decode base64 image: {}", e)))?;
Ok((format, bytes))
}
/// Map a MIME type to a Bedrock DocumentFormat.
fn mime_to_document_format(mime_type: &str) -> DocumentFormat {
match mime_type {
"application/pdf" => DocumentFormat::Pdf,
"text/csv" => DocumentFormat::Csv,
"text/html" => DocumentFormat::Html,
"text/plain" => DocumentFormat::Txt,
"application/vnd.openxmlformats-officedocument.wordprocessingml.document" => {
DocumentFormat::Docx
}
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" => {
DocumentFormat::Xlsx
}
_ => DocumentFormat::Pdf,
}
}
/// Convert a ContentPart to Bedrock ContentBlock
fn content_part_to_block(part: &ContentPart) -> Result<Option<ContentBlock>, Error> {
match part {
@@ -418,8 +439,21 @@ fn content_part_to_block(part: &ContentPart) -> Result<Option<ContentBlock>, Err
Ok(Some(ContentBlock::Image(image_block)))
}
ContentPart::File { file } => {
let (mime_type, bytes) = parse_data_url_bytes(&file.file_data)?;
let doc_source = DocumentSource::Bytes(bytes.into());
let doc_block = DocumentBlock::builder()
.format(mime_to_document_format(&mime_type))
.name(file.filename.replace('.', "_"))
.source(doc_source)
.build()
.map_err(|e| {
Error::internal_err(format!("Failed to build document block: {}", e))
})?;
Ok(Some(ContentBlock::Document(doc_block)))
}
ContentPart::S3Object { .. } => {
// S3Objects should be converted to ImageUrl before calling this function
// S3Objects should be converted before calling this function
Ok(None)
}
}

View File

@@ -354,6 +354,13 @@ pub fn convert_content_to_gemini_parts(content: &OpenAIContent) -> Vec<GeminiPar
inline_data: GeminiInlineData { mime_type, data },
})
}
ContentPart::File { file } => {
parse_data_url(&file.file_data).map(|(mime_type, data)| {
GeminiPart::InlineData {
inline_data: GeminiInlineData { mime_type, data },
}
})
}
// S3Objects are handled by the worker
_ => None,
})

View File

@@ -33,6 +33,11 @@ pub enum ContentPart {
ImageUrl {
image_url: ImageUrlData,
},
/// File content block for OpenAI Chat Completions format (PDFs, etc.)
#[serde(rename = "file")]
File {
file: FileData,
},
#[serde(rename = "s3_object")]
S3Object {
s3_object: S3Object,
@@ -44,6 +49,25 @@ pub struct ImageUrlData {
pub url: String, // data:image/png;base64,... or https://...
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct FileData {
pub filename: String,
pub file_data: String, // data:application/pdf;base64,...
}
/// Check if a MIME type represents a document (as opposed to an image).
pub fn is_document_mime(mime_type: &str) -> bool {
matches!(
mime_type,
"application/pdf"
| "text/csv"
| "text/html"
| "text/plain"
| "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
| "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
}
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(untagged)]
pub enum OpenAIContent {

View File

@@ -70,6 +70,29 @@ pub async fn download_and_encode_s3_image(
Ok((mime_type.to_string(), base64_data))
}
/// Convert an S3Object to the appropriate ContentPart based on MIME type.
pub async fn s3_object_to_content_part(
s3_object: &S3Object,
client: &AuthedClient,
workspace_id: &str,
) -> Result<ContentPart, Error> {
let (mime_type, file_bytes) =
download_and_encode_s3_image(s3_object, client, workspace_id).await?;
let data_url = format!("data:{};base64,{}", mime_type, file_bytes);
if windmill_common::ai_types::is_document_mime(&mime_type) {
let filename = s3_object
.s3
.rsplit('/')
.next()
.unwrap_or("document.pdf")
.to_string();
Ok(ContentPart::File { file: FileData { filename, file_data: data_url } })
} else {
Ok(ContentPart::ImageUrl { image_url: ImageUrlData { url: data_url } })
}
}
/// Prepare messages for API by converting S3Objects to base64 ImageUrls
pub async fn prepare_messages_for_api(
messages: &[OpenAIMessage],
@@ -92,15 +115,10 @@ pub async fn prepare_messages_for_api(
for part in parts {
match part {
ContentPart::S3Object { s3_object } => {
// Convert S3Object to base64 image URL
let (mime_type, image_bytes) =
download_and_encode_s3_image(s3_object, client, workspace_id)
.await?;
prepared_content.push(ContentPart::ImageUrl {
image_url: ImageUrlData {
url: format!("data:{};base64,{}", mime_type, image_bytes),
},
});
prepared_content.push(
s3_object_to_content_part(s3_object, client, workspace_id)
.await?,
);
}
other => {
// Keep Text and ImageUrl as-is

View File

@@ -92,7 +92,9 @@ pub enum AnthropicRequestContent {
cache_control: Option<CacheControl>,
},
#[serde(rename = "image")]
Image { source: AnthropicImageSource },
Image { source: AnthropicBase64Source },
#[serde(rename = "document")]
Document { source: AnthropicBase64Source },
#[serde(rename = "tool_use")]
ToolUse { id: String, name: String, input: Box<RawValue> },
#[serde(rename = "tool_result")]
@@ -104,9 +106,9 @@ pub enum AnthropicRequestContent {
},
}
/// Image source for Anthropic API
/// Base64 source for Anthropic API (used by both Image and Document content blocks)
#[derive(Serialize, Debug)]
pub struct AnthropicImageSource {
pub struct AnthropicBase64Source {
pub r#type: String,
pub media_type: String,
pub data: String,
@@ -270,10 +272,20 @@ fn convert_content_to_anthropic(content: &Option<OpenAIContent>) -> Vec<Anthropi
}
}
ContentPart::ImageUrl { image_url } => {
// Handle base64 images
if let Some((media_type, data)) = parse_data_url(&image_url.url) {
result.push(AnthropicRequestContent::Image {
source: AnthropicImageSource {
source: AnthropicBase64Source {
r#type: "base64".to_string(),
media_type,
data,
},
});
}
}
ContentPart::File { file } => {
if let Some((media_type, data)) = parse_data_url(&file.file_data) {
result.push(AnthropicRequestContent::Document {
source: AnthropicBase64Source {
r#type: "base64".to_string(),
media_type,
data,

View File

@@ -84,13 +84,13 @@ impl GoogleAIQueryBuilder {
);
}
if let Some(images) = args.images {
for image in images.iter() {
if !image.s3.is_empty() {
let (mime_type, image_bytes) =
download_and_encode_s3_image(image, client, workspace_id).await?;
if let Some(attachments) = args.attachments {
for attachment in attachments.iter() {
if !attachment.s3.is_empty() {
let (mime_type, file_bytes) =
download_and_encode_s3_image(attachment, client, workspace_id).await?;
parts.push(GeminiPart::InlineData {
inline_data: GeminiInlineData { mime_type, data: image_bytes },
inline_data: GeminiInlineData { mime_type, data: file_bytes },
});
}
}

View File

@@ -4,7 +4,7 @@ use serde_json::value::RawValue;
use windmill_common::{ai_providers::AIProvider, client::AuthedClient, error::Error};
use crate::ai::{
image_handler::{download_and_encode_s3_image, prepare_messages_for_api},
image_handler::{prepare_messages_for_api, s3_object_to_content_part},
query_builder::{BuildRequestArgs, ParsedResponse, QueryBuilder, StreamEventProcessor},
sse::{OpenAIResponsesSSEParser, SSEParser},
types::*,
@@ -103,6 +103,8 @@ pub enum ImageGenerationContent {
InputText { text: String },
#[serde(rename = "input_image")]
InputImage { image_url: String },
#[serde(rename = "input_file")]
InputFile { filename: String, file_data: String },
}
/// Output content for assistant messages in Responses API
@@ -240,6 +242,12 @@ fn convert_content_to_responses_format(
image_url: image_url.url.clone(),
})
}
ContentPart::File { file } => {
Some(ImageGenerationContent::InputFile {
filename: file.filename.clone(),
file_data: file.file_data.clone(),
})
}
// S3 objects should have been resolved earlier, but handle gracefully
ContentPart::S3Object { .. } => None,
})
@@ -421,15 +429,26 @@ impl OpenAIQueryBuilder {
let mut content =
vec![ImageGenerationContent::InputText { text: args.user_message.to_string() }];
// Add images if provided
if let Some(images) = args.images {
for image in images.iter() {
if !image.s3.is_empty() {
let (mime_type, image_bytes) =
download_and_encode_s3_image(image, client, workspace_id).await?;
content.push(ImageGenerationContent::InputImage {
image_url: format!("data:{};base64,{}", mime_type, image_bytes),
});
// Add attachments (images, PDFs, etc.) if provided
if let Some(attachments) = args.attachments {
for attachment in attachments.iter() {
if !attachment.s3.is_empty() {
let part =
s3_object_to_content_part(attachment, client, workspace_id).await?;
match part {
ContentPart::File { file } => {
content.push(ImageGenerationContent::InputFile {
filename: file.filename,
file_data: file.file_data,
});
}
ContentPart::ImageUrl { image_url } => {
content.push(ImageGenerationContent::InputImage {
image_url: image_url.url,
});
}
_ => {}
}
}
}
}

View File

@@ -28,7 +28,7 @@ pub struct BuildRequestArgs<'a> {
pub output_type: &'a OutputType,
pub system_prompt: Option<&'a str>,
pub user_message: &'a str,
pub images: Option<&'a [S3Object]>,
pub attachments: Option<&'a [S3Object]>,
pub has_websearch: bool,
}

View File

@@ -26,7 +26,8 @@ use windmill_types::s3::S3Object;
// Re-export shared types from windmill_common
pub use windmill_common::ai_providers::AIPlatform;
pub use windmill_common::ai_types::{
ContentPart, ImageUrlData, OpenAIContent, OpenAIMessage, ToolDef, ToolDefFunction, UrlCitation,
ContentPart, FileData, ImageUrlData, OpenAIContent, OpenAIMessage, ToolDef, ToolDefFunction,
UrlCitation,
};
/// same as OpenAIMessage but with agent_action field included in the serialization
@@ -96,7 +97,8 @@ struct AIAgentArgsRaw {
max_completion_tokens: Option<u32>,
output_schema: Option<OpenAPISchema>,
output_type: Option<OutputType>,
user_images: Option<Vec<S3Object>>,
#[serde(alias = "user_images")]
user_attachments: Option<Vec<S3Object>>,
streaming: Option<bool>,
max_iterations: Option<usize>,
memory: Option<Memory>,
@@ -116,7 +118,7 @@ pub struct AIAgentArgs {
pub max_completion_tokens: Option<u32>,
pub output_schema: Option<OpenAPISchema>,
pub output_type: Option<OutputType>,
pub user_images: Option<Vec<S3Object>>,
pub user_attachments: Option<Vec<S3Object>>,
pub streaming: Option<bool>,
pub max_iterations: Option<usize>,
pub memory: Option<Memory>,
@@ -148,7 +150,7 @@ impl From<AIAgentArgsRaw> for AIAgentArgs {
max_completion_tokens: raw.max_completion_tokens,
output_schema: raw.output_schema,
output_type: raw.output_type,
user_images: raw.user_images,
user_attachments: raw.user_attachments,
streaming: raw.streaming,
max_iterations: raw.max_iterations,
memory,

View File

@@ -714,24 +714,45 @@ pub async fn run_agent(
}
};
// Add user message if provided and non-empty
if let Some(ref user_message) = args.user_message {
if !user_message.is_empty() {
// Add user message and attachments as a single user message
// (Bedrock requires a text block alongside document blocks in the same message)
{
let has_message = args
.user_message
.as_ref()
.map(|m| !m.is_empty())
.unwrap_or(false);
let has_attachments = args
.user_attachments
.as_ref()
.map(|a| !a.is_empty())
.unwrap_or(false);
if has_message && has_attachments {
let mut parts = vec![ContentPart::Text {
text: args.user_message.clone().unwrap(),
}];
for attachment in args.user_attachments.as_ref().unwrap() {
if !attachment.s3.is_empty() {
parts.push(ContentPart::S3Object { s3_object: attachment.clone() });
}
}
messages.push(OpenAIMessage {
role: "user".to_string(),
content: Some(OpenAIContent::Text(user_message.clone())),
content: Some(OpenAIContent::Parts(parts)),
..Default::default()
});
}
}
// Add user images if provided
if let Some(ref user_images) = args.user_images {
if !user_images.is_empty() {
} else if has_message {
messages.push(OpenAIMessage {
role: "user".to_string(),
content: Some(OpenAIContent::Text(args.user_message.clone().unwrap())),
..Default::default()
});
} else if has_attachments {
let mut parts = vec![];
for image in user_images.iter() {
if !image.s3.is_empty() {
parts.push(ContentPart::S3Object { s3_object: image.clone() });
for attachment in args.user_attachments.as_ref().unwrap() {
if !attachment.s3.is_empty() {
parts.push(ContentPart::S3Object { s3_object: attachment.clone() });
}
}
messages.push(OpenAIMessage {
@@ -882,7 +903,7 @@ pub async fn run_agent(
output_type,
system_prompt: args.system_prompt.as_deref(),
user_message: args.user_message.as_deref().unwrap_or(""),
images: args.user_images.as_deref(),
attachments: args.user_attachments.as_deref(),
has_websearch,
};

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -140,10 +140,10 @@ export const AI_AGENT_SCHEMA: Schema = {
format: 'json-schema',
showExpr: "fields.output_type === 'text'"
},
user_images: {
user_attachments: {
type: 'array',
description:
'Array of images to give as input to the AI agent. Requires a configured workspace S3 storage.',
'Array of files (images or PDFs) to give as input to the AI agent. Requires a configured workspace S3 storage.',
items: {
type: 'object',
resourceType: 's3object'
@@ -176,7 +176,7 @@ export const AI_AGENT_SCHEMA: Schema = {
'streaming',
'memory',
'output_schema',
'user_images',
'user_attachments',
'max_completion_tokens',
'temperature',
'max_iterations'
@@ -186,6 +186,12 @@ export const AI_AGENT_SCHEMA: Schema = {
function migrateAiAgentInputTransforms(
inputTransforms: Record<string, InputTransform>
): Record<string, InputTransform> {
// Migrate user_images → user_attachments
if ('user_images' in inputTransforms && !('user_attachments' in inputTransforms)) {
inputTransforms.user_attachments = inputTransforms.user_images
delete inputTransforms.user_images
}
// Check if this has the legacy format
if ('messages_context_length' in inputTransforms && !('memory' in inputTransforms)) {
const legacyValue = inputTransforms.messages_context_length

View File

@@ -136,11 +136,13 @@ ALL_PROVIDERS = [
OPENROUTER,
]
# Vision-capable providers for user_images tests
# Vision-capable providers for user_images/user_attachments tests
VISION_PROVIDERS = [
OPENAI, # gpt-4o-mini supports vision
ANTHROPIC, # claude-3 supports vision
GOOGLE_AI, # gemini supports vision
OPENAI, # gpt-4o-mini supports vision
ANTHROPIC, # claude-3 supports vision
GOOGLE_AI, # gemini supports vision
OPENROUTER, # openai-compatible, vision depends on model
BEDROCK, # bedrock converse API supports vision and documents
]

View File

@@ -0,0 +1,21 @@
%PDF-1.4
1 0 obj<</Type/Catalog/Pages 2 0 R>>endobj
2 0 obj<</Type/Pages/Kids[3 0 R]/Count 1>>endobj
3 0 obj<</Type/Page/Parent 2 0 R/MediaBox[0 0 612 792]/Contents 4 0 R/Resources<</Font<</F1 5 0 R>>>>>>endobj
4 0 obj<</Length 44>>stream
BT /F1 24 Tf 100 700 Td (Hello PDF World) Tj ET
endstream
endobj
5 0 obj<</Type/Font/Subtype/Type1/BaseFont/Helvetica>>endobj
xref
0 6
0000000000 65535 f
0000000009 00000 n
0000000058 00000 n
0000000115 00000 n
0000000266 00000 n
0000000360 00000 n
trailer<</Size 6/Root 1 0 R>>
startxref
431
%%EOF

View File

@@ -0,0 +1,96 @@
"""Tests for AI agent user_attachments (PDF) functionality with S3 storage.
Prerequisites:
- MinIO running on localhost:9000 with bucket 'wmill'
- Test files uploaded to MinIO (test_images/test_image.webp, test_documents/test_document.pdf)
- S3 resource and storage configured in the integration-tests workspace
"""
import pytest
from .conftest import AIAgentTestClient, create_ai_agent_flow, TEST_IMAGE_S3_KEY
from .providers import VISION_PROVIDERS, get_provider_ids
TEST_PDF_S3_KEY = "test_documents/test_document.pdf"
class TestUserAttachments:
"""Test AI agent with PDF attachments from S3 storage."""
@pytest.mark.parametrize(
"provider_config",
VISION_PROVIDERS,
ids=get_provider_ids(VISION_PROVIDERS),
)
def test_pdf_analysis(
self,
client: AIAgentTestClient,
setup_providers,
provider_config,
):
"""Test that AI can analyze a PDF uploaded to S3."""
flow_value = create_ai_agent_flow(
provider_input_transform=provider_config["input_transform"],
system_prompt="You are a helpful assistant that reads documents. Be concise.",
include_user_images=True,
)
# Run the flow with the PDF (test_document.pdf contains "Hello PDF World")
result = client.run_preview_flow(
flow_value=flow_value,
args={
"user_message": "What text does this PDF document contain? Reply with just the text.",
"user_images": [
{
"s3": TEST_PDF_S3_KEY,
"storage": None,
"filename": "test_document.pdf",
}
],
},
)
assert result is not None
assert isinstance(result, (dict, str))
result_text = str(result).lower()
assert "hello" in result_text or "pdf" in result_text, (
f"Expected AI to read PDF content containing 'Hello PDF World', "
f"got: {result}"
)
print(f"PDF analysis result from {provider_config['name']}: {result}")
@pytest.mark.parametrize(
"provider_config",
VISION_PROVIDERS,
ids=get_provider_ids(VISION_PROVIDERS),
)
def test_backward_compat_user_images(
self,
client: AIAgentTestClient,
setup_providers,
provider_config,
):
"""Test that the old user_images field name still works for images."""
flow_value = create_ai_agent_flow(
provider_input_transform=provider_config["input_transform"],
system_prompt="You are a helpful assistant that describes images. Be concise.",
include_user_images=True,
)
result = client.run_preview_flow(
flow_value=flow_value,
args={
"user_message": "Describe what you see in this image in one sentence.",
"user_images": [
{
"s3": TEST_IMAGE_S3_KEY,
"storage": None,
"filename": "test_image.webp",
}
],
},
)
assert result is not None
assert isinstance(result, (dict, str))
print(f"Backward compat image result from {provider_config['name']}: {result}")

View File

@@ -1027,13 +1027,13 @@ components:
JSON Schema object defining structured output format. Used when you need the AI to return data in a specific shape.
Supports standard JSON Schema properties: type, properties, required, items, enum, pattern, minLength, maxLength, minimum, maximum, etc.
Example: { type: 'object', properties: { name: { type: 'string' }, age: { type: 'integer' } }, required: ['name'] }
user_images:
user_attachments:
allOf:
- $ref: '#/components/schemas/InputTransform'
description: |
Array of image references for vision-capable models.
Array of file references (images or PDFs) for the AI agent.
Format: Array<{ bucket: string, key: string }> - S3 object references
Example: [{ bucket: 'my-bucket', key: 'images/photo.jpg' }]
Example: [{ bucket: 'my-bucket', key: 'documents/report.pdf' }]
max_completion_tokens:
allOf:
- $ref: '#/components/schemas/InputTransform'

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long