Processing Architecture¶
This document describes the processing system architecture for FarmCove Delta, which enables intelligent document processing, data extraction, validation, and workflow automation through both AI-driven and manual validation steps.
Table of Contents¶
- Overview
- System Components
- Processing Flow
- Database Schema
- Service Architecture
- Execution Group-Based Callback Architecture
- Dynamic Processor System
- Enhanced Address Matching with AI
- Integration Points
- Prompt Resolution System
- Template Variable Substitution
- Status Tracking
- Error Handling
- Security Considerations
- Key Architecture Principles
- Future Enhancements
- Implementation Examples
- Idempotency and Duplicate Processing Prevention
- Conclusion
Overview¶
The Processing system provides a flexible, template-based approach to processing various entity types (transactions, budgets, schedules) through multi-step workflows. It supports both AI-driven processing and custom validation steps, with parallel execution, conditional logic, issue tracking, and comprehensive status monitoring.
Key Features¶
- Template-Based Workflows: Define reusable processing templates for different entity types
- Hybrid Processing: Supports both AI-driven steps (OCR, classification) and manual validation steps (total checks, format validation)
- Dynamic Processor System: Automatically routes step results to appropriate service processors
- Issue Resolution: Manual validation steps can create issues requiring human resolution
- Parallel Execution: Steps with the same execution group run concurrently
- Conditional Processing: Steps can be skipped based on conditions
- Real-time Status Updates: Track progress of individual steps, overall job, and issue resolution
- Polymorphic Design: Single system handles multiple entity types
- Error Recovery: Optional steps don't block entire job on failure
- Extensible Architecture: Easy to add new processors without modifying core logic
System Components¶
1. App Layer (Next.js)¶
- Service Layer:
/packages/app/src/services/processing/ base.ts: Core processing functions and processor registry (CRUD operations only)server.ts: Server-side operations with AI integrations and external servicesclient.ts: Client-side operations for React componentsactions.ts: Server Actions for Next.js App Routerprocessors/transactionProcessors.ts: Transaction-specific processors
Service Layer Architecture: Services follow a strict layered architecture where:
- Base layer handles pure business logic and database operations
- Server layer manages external service integrations (AI, geocoding, etc.)
-
See
docs/development/SERVICE_PATTERNS.mdfor detailed patterns -
API Routes:
/packages/app/src/app/api/ai-processing/ callback/route.ts: Process step results and route to processors-
start-job/route.ts: Initiate processing jobs -
UI Components:
ProcessingStatusIndicator: Visual status display- Integration with existing transaction/budget/schedule UI
2. AI Hub (Genkit Service)¶
- Location:
/packages/aiHub/ - Purpose: Executes AI processing tasks
- Key Features:
- OCR document processing
- Document classification
- Data extraction with structured output
- Budget linking and validation
- Vendor matching
3. Database Layer¶
- Templates: Define processing workflows
- Jobs: Track processing instances
- Steps: Individual processing tasks
- Integration: Links to existing entities (transactions, budgets, etc.)
Processing Flow¶
Processing Flow Orchestration¶
The AI processing system uses a two-tier function architecture to manage the flow from initial request to AI Hub execution:
High-Level Orchestration: createJobAndStartProcessing¶
Purpose: Entry point for AI processing that handles all setup and orchestration.
Location: packages/app/src/services/processing/server.ts
Responsibilities:
- Validates the processable type
- Creates the AI processing job in the database
- Routes to type-specific processing based on entity type
- Fetches required data (e.g., project ID for transactions)
- Calls the appropriate service's processing function
Usage Example:
// From WhatsApp bot or UI
const result = await createJobAndStartProcessing(
PROCESSABLE_TYPE.TRANSACTIONS,
transactionId,
userId,
'Receipt Processing',
{ uploadedVia: 'whatsapp' },
'application/pdf'
);
Call Flow:
WhatsApp/UI Upload
↓
createJobAndStartProcessing (creates job)
↓
Type-specific router (e.g., startTransactionProcessing)
↓
sendFirstGroupToAIHub (initiates AI Hub processing)
Low-Level Execution: sendFirstGroupToAIHub¶
Purpose: Sends the first execution group of steps to AI Hub after setup is complete.
Location: packages/app/src/services/processing/base.ts
Responsibilities:
- Retrieves job steps from database
- Prepares the first execution group
- Resolves prompts with entity data
- Sends initial request to AI Hub
- Updates job status to IN_PROGRESS
Requirements:
- Job must already exist in database
- All entity data must be provided
- Storage path and file metadata required
Usage Example:
// Called by service-specific processing functions
const success = await sendFirstGroupToAIHub(
PROCESSABLE_TYPE.TRANSACTIONS,
transactionId,
transactionEntity, // Full entity with relations
TASK_TYPE.TRANSACTION_PROCESS,
storagePath,
mimeType,
jobId, // Must already exist
userId,
projectId,
sendToAIHub // Callback to AI Hub
);
Key Differences¶
| Aspect | createJobAndStartProcessing |
sendFirstGroupToAIHub |
|---|---|---|
| Purpose | Orchestration & Setup | AI Hub Communication |
| Job Creation | Creates new job | Expects existing job |
| Entity Data | Fetches from database | Must be provided |
| Type Routing | Routes by entity type | Generic for all types |
| Abstraction Level | High (business logic) | Low (technical execution) |
| Entry Point | External APIs/Actions | Internal service calls |
| Error Handling | User-friendly messages | Technical logging |
1. Job Creation¶
// Example: Create processing job for a transaction
const job = await createProcessingJob(
PROCESSABLE_TYPE.TRANSACTIONS,
transactionId,
'Transaction Import Processing',
{ fileUrl, mimeType }
);
2. Template Structure¶
Templates define the processing workflow:
-- Example: Transaction Import Processing Template
Template
├── OCR Step (Group 1, Order 1)
├── Classification Step (Group 2, Order 1, Depends on: OCR)
├── Data Extraction Step (Group 3, Order 1, Depends on: Classification)
├── Budget Linking Step (Group 4, Order 1, Depends on: Data Extraction, Optional)
├── Vendor Matching Step (Group 4, Order 2, Depends on: Data Extraction, Optional)
└── Finish Processing Step (Final Group, MANDATORY for all templates)
Mandatory Finish Processing Step¶
Important: Every processing template MUST include a FinishProcessing step as the final step. This is a base step type that:
- Applies to all task types (transactions, scripts, scenes, schedules, etc.)
- Runs after all other steps complete successfully
- Triggers final status updates in the application
- Does NOT require AI processing - it's a system step
- Enables task-specific completion logic through processor methods
Example configuration:
-- In any template's step configuration
INSERT INTO process_template_steps (
process_template_id,
step_key,
display_name,
execution_group,
depends_on_steps,
ai_hub_step_type,
processor_method
) VALUES (
template_id,
'finish_processing',
'Finish Processing',
5, -- Last execution group
ARRAY['all_previous_steps'], -- Depends on all prior steps
'FinishProcessing',
'finishTransactionProcessing' -- Task-specific finish method
);
3. Step Execution¶
Execution Group-Based Processing¶
The system uses an execution group-based callback mechanism to ensure dependent steps always receive fresh entity data:
- Group-by-Group Execution: Steps are processed in execution groups sequentially
- Fresh Entity Data: After each group completes, a callback is sent to AI Hub with:
- Updated entity data reflecting all completed steps' changes
- Only the steps for the next execution group
- Dependency Resolution: This ensures steps that depend on earlier steps see their database updates
Processing Flow¶
- Initial Send: First execution group steps are sent to AI Hub with initial entity data
- Group Completion: When all steps in a group complete (via webhook callbacks):
- System fetches fresh entity data from database
- Identifies next execution group's ready steps
- Sends callback to AI Hub with updated entity and next steps
- Parallel Execution: Steps within same group execute concurrently
- Result Storage: Each step stores results for use by subsequent steps
- Status Updates: Real-time progress tracking via webhooks
- Finish Processing: Mandatory final step runs to complete the job
- Completion Check: Job marked complete when finish step succeeds
Example Flow¶
Execution Group 1: OCR Extraction
↓ (completes, updates database)
Callback with fresh entity → AI Hub
↓
Execution Group 2: Classification (sees OCR results)
↓ (completes, updates database)
Callback with fresh entity → AI Hub
↓
Execution Group 3: Data Extraction (sees classification)
↓ (completes, updates database)
Callback with fresh entity → AI Hub
↓
Execution Group 4: Budget & Vendor Matching (parallel, both see extraction results)
4. Status Flow¶
PENDING → IN_PROGRESS → COMPLETED
↘
→ FAILED
→ PARTIAL_SUCCESS
Database Schema¶
Core Tables¶
- process_templates: Processing workflow definitions
service_name: Optional service name for processor routing- process_template_steps: Individual step definitions
process_template_id: Reference to templateprocessor_method: Optional method name for dynamic processor invocationon_failure_action: Defines behavior when step fails (fail_job, skip_dependents, continue)- processing_jobs: Job instances
process_template_id: Reference to template- processing_job_steps: Step execution tracking
processing_job_id: Reference to jobprocess_template_step_id: Reference to template step- processing_issues: Issues requiring human resolution
processable_type: Type of entity with issueprocessable_id: ID of entity with issueprocessing_job_id: Optional reference to jobprocessing_job_step_id: Optional reference to step- ai_prompts: Reusable AI prompts with variable substitution
Key Relationships¶
process_templates
↓ (1:N)
process_template_steps → ai_prompts (optional)
↓ (1:N via job creation)
processing_jobs
↓ (1:N)
processing_job_steps
↓ (1:N)
processing_issues (optional, for manual validation)
Service Architecture¶
Client-Side Services¶
// packages/app/src/services/processing/client.ts
// Get job status with polling
const status = await getJobStatusWithPolling(jobId);
// Check if entity has active job
const hasActive = await hasActiveProcessingJob(
PROCESSABLE_TYPE.TRANSACTIONS,
transactionId
);
Server-Side Services¶
// packages/app/src/services/processing/server.ts
// High-level orchestration - creates job and starts processing
const result = await createJobAndStartProcessing(
processableType,
processableId,
userId,
title,
metadata,
fileType
);
// Low-level - send first group to AI Hub (job must exist)
const success = await sendFirstGroupToAIHub(
processableType,
processableId,
entity,
taskType,
storagePath,
mimeType,
jobId,
userId,
projectId,
sendToAIHub
);
// Update step status from webhook
await updateStepStatus(jobId, stepKey, status, resultData, errorMessage);
Transaction Service Integration¶
// packages/app/src/services/transaction/base.ts
// Start transaction processing
const success = await startTransactionProcessing(
transactionId,
fileDataUrl,
mimeType,
jobId,
sendToAIHub
);
Execution Group-Based Callback Architecture¶
Overview¶
The system implements an execution group-based callback mechanism that ensures each processing step works with the most current database state. This architecture provides data consistency across dependent steps while maintaining parallel processing efficiency.
Core Design Principles¶
- Execution Groups: Steps are organized into sequential execution groups based on dependencies
- Group-by-Group Processing: Each execution group is processed sequentially
- Fresh Data Callbacks: After each group completes, fresh entity data is fetched from the database and sent with the next group
- Parallel Within Groups: Steps within the same execution group run concurrently for optimal performance
Implementation Details¶
// When execution group completes
async function onExecutionGroupComplete(jobId: string) {
// 1. Check if all steps in current group are done
const currentGroupComplete = await checkGroupCompletion(jobId);
if (currentGroupComplete) {
// 2. Fetch fresh entity data (includes all DB changes)
const freshEntity = await getEntityByJobId(jobId);
// 3. Get next execution group steps
const nextSteps = await getNextExecutionGroupSteps(jobId);
if (nextSteps.length > 0) {
// 4. Send callback to AI Hub with fresh data
await sendToAIHub({
entity: freshEntity, // Fresh data with all updates
metadata: { steps: nextSteps },
});
}
}
}
Key Benefits¶
- Data Consistency: Dependent steps always work with the latest database state
- Simplified Architecture: Clean separation between execution groups
- Optimized Communication: Only relevant steps sent per callback to AI Hub
- Parallel Performance: Steps within the same group execute concurrently
- Clear Dependencies: Execution groups make step dependencies explicit and manageable
Dynamic Processor System¶
The dynamic processor system automatically routes AI processing results to appropriate service methods, eliminating the need for switch statements and making the system easily extensible.
Architecture¶
// Processor Registry
const processorRegistry: ProcessorRegistry = {
transaction: {
processDocumentParsing,
processOcrExtraction,
processProjectRelationshipMatching,
processBudgetItemMatching,
processJustificationAnalysis,
},
// Add more services and processors as needed
};
How It Works¶
- Template Configuration: Each template step can specify:
processor_method: The method to call for processing results-
Template
service_name: The service containing the processors -
Automatic Routing: When a step completes, the system:
// In processStepResult function
const processor = getProcessor(
template.service_name,
step.template_step.processor_method
);
if (processor) {
const success = await processor(step);
}
- Processor Implementation: Each processor follows a standard interface:
type ProcessorFunction = (
step: ProcessingJobStepWithRelation
) => Promise<boolean>;
Adding New Processors¶
- Create Processor Function:
export async function processNewStepType(
step: ProcessingJobStepWithRelation
): Promise<boolean> {
const { result_data, processing_job_id } = step;
if (!result_data) return true;
// Process the results
// Update relevant tables
return true;
}
- Register in Service:
// In transactionProcessors.ts
export const transactionProcessors = {
processDocumentParsing,
processOcrExtraction,
processNewStepType, // Add new processor
};
- Configure in Database:
-- In template step configuration
UPDATE process_template_steps
SET processor_method = 'processNewStepType'
WHERE step_key = 'new_step_type';
Benefits¶
- No Code Changes: Add new processors without modifying core logic
- Type Safety: Full TypeScript support for processor functions
- Testability: Each processor is independently testable
- Maintainability: Clear separation of concerns
- Extensibility: Easy to add new entity types and processors
Example: Transaction Processing with Enhanced Address Matching¶
// Transaction processors with AI-enhanced operations
export const transactionProcessors = {
// Document parsing extracts type and metadata
processDocumentParsing: async (step) => {
// Updates: document_type, handwritten_notes, etc.
},
// OCR extraction with address creation
processOcrExtraction: async (step) => {
const { result_data } = step;
// Parse AI response from message field
const aiResponse = JSON.parse(result_data.message);
// Create transaction using base layer
const transaction = await createTransactionBase(aiResponse);
// Enhanced address creation with AI matching (server layer)
if (aiResponse.address) {
const address = await createAddressWithAIMatching({
...aiResponse.address,
transactionId: transaction.id,
});
// Update transaction with matched/created address
await updateTransaction(transaction.id, {
address_id: address.id,
});
}
// Return full transaction for context
return transaction;
},
// Project matching links to projects
processProjectRelationshipMatching: async (step) => {
// Updates: project_relationship_id, type, etc.
},
// Finish processing - MANDATORY final step
finishTransactionProcessing: async (step) => {
// Updates transaction status to 'Ready'
// Performs any final cleanup or notifications
// This runs after ALL other steps complete successfully
},
// Budget linking for line items
processBudgetItemMatching: async (step) => {
// Updates: line item budget associations
},
// Justification analysis
processJustificationAnalysis: async (step) => {
// Updates: ai_justification field
},
};
Enhanced Address Matching with AI¶
The system implements intelligent address deduplication using AI-powered semantic matching to prevent duplicate addresses and improve data quality.
Architecture¶
Address creation follows a multi-layer approach:
- Transaction Service (server): Initiates address creation from OCR results
- Address Service (server): Handles AI matching and geocoding
- Address Service (base): Performs pure CRUD operations
AI Matching Process¶
// packages/app/src/services/address/server.ts (example integration)
export async function createAddressWithAIMatching(
addressData: AddressCreateData
): Promise<ServiceResponse<Address>> {
// 1. Fetch existing addresses for comparison
const existingAddresses = await findExistingAddresses();
// 2. Use AI to find semantic matches
const prompt = await resolvePromptWithVariables({
promptIdentifier: { name: 'AddressMatchingPrompt' },
variables: {
entity: { newAddress: addressData, existingAddresses },
entityId: addressData.transactionId,
entityType: 'address',
},
});
const aiMatch = await aiHubClient.findMatch(prompt);
// 3. Return existing if match found
if (aiMatch?.matchId) {
return findAddressById(aiMatch.matchId);
}
// 4. Geocode new address
const geocoded = await geocodeAddress(addressData);
// 5. Create new address
return createAddressBase({ ...addressData, ...geocoded });
}
Benefits¶
- Deduplication: Prevents duplicate addresses with different formatting
- Data Quality: Maintains clean, consistent address database
- Smart Matching: Uses semantic understanding, not just string comparison
- Geocoding Integration: Enriches addresses with geographic data
Integration Points¶
1. WhatsApp Bot (Sana)¶
- Creates processing jobs when documents are uploaded
- Provides status updates during processing
- Displays results when complete
2. Web Application¶
- Shows processing status in transaction/budget/schedule views
- Allows manual retry of failed jobs
- Displays extracted data for review
3. AI Hub Communication¶
- Initial Request: App → SQS → AI Hub (first execution group only)
- Group Completion Callbacks: App → AI Hub (subsequent groups with fresh entity data)
- Status Updates: AI Hub → Webhook → App (for each step completion)
- File Transfer: Via S3 presigned URLs
- Prompt Resolution: Dynamic variable substitution in prompts using @delta/common
Callback Mechanism¶
The callback system ensures data consistency by sending fresh entity data with each execution group:
Callback Structure:
{
type: 'TRANSACTION_PROCESS',
entityId: transactionId,
entity: freshEntityData, // Fetched after group completion
metadata: {
steps: nextGroupSteps, // Only next group's steps
jobId: jobId,
executionGroup: nextGroup
}
}
Key Features:
- Fresh entity data fetched from database after each group completes
- Only the next execution group's steps are sent
- Maintains execution group parallelism
- Ensures data consistency across the processing pipeline
Prompt Resolution System¶
The system provides flexible prompt resolution with dynamic variable substitution, supporting both prompt IDs and names for maximum flexibility.
Prompt Identifier Pattern¶
Prompts can be resolved using either:
- Prompt ID: Direct UUID reference for performance
- Prompt Name: Human-readable identifier for maintainability
// Resolution by ID
const prompt = await resolvePromptWithVariables({
promptIdentifier: { id: 'uuid-123' },
variables: { entity, entityId, entityType },
});
// Resolution by name
const prompt = await resolvePromptWithVariables({
promptIdentifier: { name: 'TransactionOCRExtraction' },
variables: { entity, entityId, entityType },
});
Server-Side Prompt Resolution¶
The server layer provides a wrapper for secure prompt resolution:
// packages/app/src/services/aiPrompt/server.ts (example)
export async function resolvePromptWithVariables(
promptIdentifier: { name?: string; id?: string },
variables: PromptVariables
): Promise<ServiceResponse<string>> {
// Fetches prompt from database
// Resolves variables using template engine
// Returns processed prompt text
}
Variable Structure¶
Variables follow the PromptVariables type:
interface PromptVariables {
entity: Record<string, any>; // The main entity data
entityId: string; // Entity identifier
entityType: string; // Entity type (transaction, budget, etc.)
}
Template Variable Substitution¶
The system uses the resolveTemplate function from @delta/common for variable substitution. This allows prompts to access entity data and format it appropriately for AI processing.
Variable Syntax¶
Single Field Access¶
Access simple fields using dot notation:
{!entity.field}
{!entity.nested.field}
Example:
Template: "Process transaction {!entity.transaction_code} from {!entity.submission_address}"
Entity: { transaction_code: "TXN-001", submission_address: "john@example.com" }
Result: "Process transaction TXN-001 from john@example.com"
Array Formatting¶
Extract and format specific fields from arrays:
{!Array:path.to.array|field1,field2,field3}
Example:
Template: "Tax rates: {!Array:entity.project.tax_scheme.tax_rates|id,label,description,percentage}"
Entity: {
project: {
tax_scheme: {
tax_rates: [
{ id: "uuid-123", label: "Standard", description: "Standard VAT", percentage: 20 },
{ id: "uuid-456", label: "Reduced", description: "Reduced VAT", percentage: 5 }
]
}
}
}
Result: "Tax rates: id: uuid-123, label: Standard, description: Standard VAT, percentage: 20, id: uuid-456, label: Reduced, description: Reduced VAT, percentage: 5"
Object Field Extraction¶
Extract specific fields from an object:
{!Object:path.to.object|field1,field2}
Example:
Template: "Project details: {!Object:entity.project|id,title,status}"
Entity: {
project: {
id: "proj-001",
title: "Summer Campaign",
status: "active",
created_at: "2024-01-01",
budget: 50000
}
}
Result: "Project details: id: proj-001, title: Summer Campaign, status: active"
Real-World Usage in Prompts¶
Document Parsing Prompt¶
-- In ai_prompts table
'Analyze this document for the following:
1. Document type classification
2. Tax eligibility based on: {!entity.project.tax_scheme.rules}
3. Submit any found codes from the document'
OCR Extraction Prompt¶
-- In ai_prompts table
'Extract transaction details from this receipt.
Valid tax rates for line items: {!Array:entity.project.tax_scheme.tax_rates|id,label,description,percentage}
Please use the tax rate ID from the list above for each line item.'
Project Matching Prompt¶
-- In ai_prompts table
'Match this transaction to the appropriate project.
Available projects: {!Array:entity.available_projects|id,project_code,title}
Current project context: {!Object:entity.project|id,title,type}'
Implementation Details¶
- Variable Resolution: Happens in
packages/app/src/services/processing/base.tsduring prompt preparation - Entity Context: The
entityobject contains the full entity with all relations (e.g.,TransactionWithRelations) - Null Handling: Missing or null values resolve to empty strings
- Type Safety: All values are converted to strings for prompt injection
- Generic Design: Works with any entity type and field structure
Status Tracking¶
Job Status¶
- PENDING: Job created, waiting to start
- IN_PROGRESS: At least one step is running
- COMPLETED: All steps finished successfully
- FAILED: All steps failed or critical step failed
- PARTIAL_SUCCESS: Some optional steps failed
Step Status¶
- PENDING: Step created, dependencies not met
- READY: Dependencies satisfied, ready to execute
- IN_PROGRESS: Currently executing
- COMPLETED: Finished successfully
- FAILED: Execution failed
- SKIPPED: Condition not met or dependency failed
Progress Calculation¶
-- Database function: calculate_job_progress
Progress = (Completed Steps / Total Steps) * 100
Error Handling¶
Step-Level Errors¶
- Error message stored in
error_messagefield - Optional steps don't block job completion
- Failed dependencies cause dependent steps to be skipped
- Steps can define failure action behavior via
on_failure_actionfield
Failure Action Types¶
The on_failure_action field in ai_process_template_steps determines how the system responds when a step fails:
fail_job(Default)- The entire job is marked as FAILED
- All dependent steps are automatically skipped
- Use for critical steps where failure means the entire process cannot continue
-
Example: OCR extraction failing means no data to process
-
skip_dependents - The job continues running
- Only steps that directly depend on this step are skipped
- Other parallel or independent steps continue normally
- Use for optional enrichment steps that have specific dependents
-
Example: Budget matching fails, so budget validation is skipped, but vendor matching continues
-
continue - The job continues as if the step succeeded
- Dependent steps still run normally
- The step is marked as FAILED but doesn't impact flow
- Use for completely optional steps that don't affect downstream processing
- Example: Sending a notification fails but shouldn't stop processing
Retry Configuration¶
The on_retry_action field in ai_process_template_steps allows fine-grained control over retry behavior:
Model Switching¶
Configure automatic model switching on retries to handle model-specific failures:
{
"switch_models": ["claude-3-opus", "gpt-4-turbo"]
}
Behavior: Models cycle through the array plus the original model. With 5 max retries:
- Initial attempt: Original model from
model_config - Retry 1:
claude-3-opus - Retry 2:
gpt-4-turbo - Retry 3: Original model
- Retry 4:
claude-3-opus - Retry 5:
gpt-4-turbo
Max Tokens Adjustment¶
Increase max output tokens on each retry to handle truncated JSON responses:
{
"raise_max_tokens_by": 2000
}
Behavior: Max tokens increases by the specified amount per retry (no cap):
- Initial: 8000 (from config)
- Retry 1: 10000
- Retry 2: 12000
- Retry 3: 14000
- Retry 4: 16000
Combined Configuration¶
Both strategies can be used together:
{
"switch_models": ["claude-3-opus"],
"raise_max_tokens_by": 2000
}
This provides flexibility for handling different failure scenarios.
Job-Level Errors¶
- Job marked as FAILED if any step with
fail_jobaction fails - PARTIAL_SUCCESS if some optional steps fail but critical steps succeed
- Retry capability at job or step level with configurable retry actions
Webhook Resilience¶
- Idempotent status updates
- Handles out-of-order updates
- Validates job/step existence before updates
Security Considerations¶
Authentication¶
- Service role for system operations
- User authentication for job creation
- Row-Level Security on all tables
Data Access¶
- Users can only view their own jobs
- Organization-based access control
- Audit trail via created_by fields
File Security¶
- S3 presigned URLs with expiration
- Encrypted file transfer
- No permanent storage of sensitive documents
Key Architecture Principles¶
Service Layer Separation¶
The system strictly enforces separation between service layers:
- Base Services: Pure business logic, no external calls
- Server Services: External integrations (AI, geocoding, email)
- Client Services: React component data fetching
- Sana Services: WhatsApp bot specific logic
For detailed patterns, see docs/development/SERVICE_PATTERNS.md
AI Response Parsing¶
AI responses are consistently parsed from the message field:
// Correct parsing pattern
const aiResponse = JSON.parse(result_data.message);
// NOT: JSON.parse(result_data.response)
Idempotent Design¶
All processing steps are designed to be idempotent:
- Check completion status before processing
- Return success with
skipped: truefor completed steps - Atomic database updates prevent partial state
Future Enhancements¶
1. Batch Processing¶
- Process multiple documents in single job
- Bulk status updates
- Parallel document processing
2. Custom Workflows¶
- User-defined templates
- Drag-and-drop workflow builder
- Custom AI prompts per organization (partially implemented with prompt templates)
3. Advanced Analytics¶
- Processing time metrics
- Success rate tracking
- Cost optimization insights
4. Integration Expansion¶
- Email document ingestion
- Direct API uploads
- Third-party system webhooks
5. Enhanced Processing Features¶
- Conditional processor selection based on result data
- Multi-step transactions with rollback support
- Real-time processing progress via WebSockets
- Custom error recovery strategies per step type
Implementation Examples¶
Creating a Processing Job¶
// In WhatsApp bot handler
const job = await createTransactionProcessingJob(transactionId, {
fileUrl: s3Url,
mimeType: 'application/pdf',
uploadedBy: userId,
});
// Start processing - only sends first execution group
await startTransactionProcessing(
transactionId,
fileUrl,
mimeType,
job.id,
sendToAIHub // Callback that sends to AI Hub
);
// Subsequent groups sent automatically via callbacks
// as each execution group completes
Displaying Status¶
// In React component
const ProcessingStatus = ({ transactionId }) => {
const { data: status } = useJobStatus(transactionId);
return (
<ProcessingStatusIndicator
job={status?.job}
steps={status?.steps}
onRetry={handleRetry}
/>
);
};
Webhook Handler¶
// API route handler for status updates from AI Hub
export async function POST(request: Request) {
const { jobId, stepKey, status, resultData } = await request.json();
// Update step status in database
const success = await updateProcessingStepStatus(
jobId,
stepKey,
status,
resultData
);
// Check if execution group is complete
if (success && status === 'COMPLETED') {
await checkAndSendNextGroup(jobId);
}
return Response.json({ success });
}
Callback Handler with Idempotency¶
// API route handler with duplicate processing prevention
export async function POST(request: Request) {
const { jobId, stepKey, status, resultData, errorMessage } =
await request.json();
// Check if step is already completed (idempotency)
const stepStatus = await getStepStatus(jobId, stepKey);
if (stepStatus === PROCESSING_STEP_STATUS.COMPLETED) {
return Response.json({
success: true,
skipped: true,
message: 'Step already completed',
});
}
// Update step status
const updated = await updateProcessingStepStatus(
jobId,
stepKey,
status,
resultData,
errorMessage
);
if (!updated) {
return Response.json({ success: false, error: 'Failed to update status' });
}
// Process results if step completed successfully
if (status === PROCESSING_STEP_STATUS.COMPLETED && resultData) {
const processed = await processStepResult(jobId, stepKey);
// Check if current execution group is complete
const groupComplete = await isExecutionGroupComplete(jobId, stepKey);
if (groupComplete) {
// Fetch fresh entity data
const freshEntity = await getEntityByJobId(jobId);
// Get next execution group steps
const nextSteps = await getNextExecutionGroupSteps(jobId);
if (nextSteps.length > 0) {
// Send callback to AI Hub with fresh data
await sendToAIHub({
type: getTaskType(jobId),
entityId: getEntityId(jobId),
entity: freshEntity,
metadata: {
steps: nextSteps,
jobId,
executionGroup: nextSteps[0].execution_group,
},
});
}
}
return Response.json({ success: processed });
}
return Response.json({ success: true });
}
Idempotency and Duplicate Processing Prevention¶
The system implements comprehensive idempotency to handle duplicate messages from SQS and ensure data integrity.
Key Features¶
- Step Status Checking: Before processing, the system checks if a step is already completed
- Atomic Updates: Database operations use transactions to prevent partial updates
- Message Deduplication: Handles SQS message redelivery gracefully
- Skipped Response: Returns
skipped: trueflag for already-processed steps
Implementation¶
// Check step status before processing
export async function getStepStatus(
jobId: string,
stepKey: string
): Promise<string | null> {
const step = await findOne(DatabaseTable.PROCESSING_JOB_STEPS, {
processing_job_id: jobId,
step_key: stepKey,
});
return step?.status || null;
}
// Idempotent processor
export async function processStep(step: ProcessingJobStep) {
// Check if already processed
if (step.status === PROCESSING_STEP_STATUS.COMPLETED) {
return { success: true, skipped: true };
}
// Process the step
const result = await performProcessing(step);
// Atomic status update
await updateStepStatus(step.id, PROCESSING_STEP_STATUS.COMPLETED);
return { success: true, skipped: false };
}
Benefits¶
- Data Integrity: Prevents duplicate processing and data corruption
- Resilience: Handles network failures and message redelivery
- Auditability: Clear tracking of skipped vs processed steps
- Performance: Avoids unnecessary reprocessing
Conclusion¶
The AI Processing Architecture provides a robust, scalable foundation for intelligent document processing in FarmCove Delta. Its template-based approach allows easy extension to new entity types and processing workflows while maintaining consistency and reliability.