Skip to content

Processing Architecture

This document describes the processing system architecture for FarmCove Delta, which enables intelligent document processing, data extraction, validation, and workflow automation through both AI-driven and manual validation steps.

Table of Contents

  1. Overview
  2. System Components
  3. Processing Flow
  4. Database Schema
  5. Service Architecture
  6. Execution Group-Based Callback Architecture
  7. Dynamic Processor System
  8. Enhanced Address Matching with AI
  9. Integration Points
  10. Prompt Resolution System
  11. Template Variable Substitution
  12. Status Tracking
  13. Error Handling
  14. Security Considerations
  15. Key Architecture Principles
  16. Future Enhancements
  17. Implementation Examples
  18. Idempotency and Duplicate Processing Prevention
  19. Conclusion

Overview

The Processing system provides a flexible, template-based approach to processing various entity types (transactions, budgets, schedules) through multi-step workflows. It supports both AI-driven processing and custom validation steps, with parallel execution, conditional logic, issue tracking, and comprehensive status monitoring.

Key Features

  • Template-Based Workflows: Define reusable processing templates for different entity types
  • Hybrid Processing: Supports both AI-driven steps (OCR, classification) and manual validation steps (total checks, format validation)
  • Dynamic Processor System: Automatically routes step results to appropriate service processors
  • Issue Resolution: Manual validation steps can create issues requiring human resolution
  • Parallel Execution: Steps with the same execution group run concurrently
  • Conditional Processing: Steps can be skipped based on conditions
  • Real-time Status Updates: Track progress of individual steps, overall job, and issue resolution
  • Polymorphic Design: Single system handles multiple entity types
  • Error Recovery: Optional steps don't block entire job on failure
  • Extensible Architecture: Easy to add new processors without modifying core logic

System Components

1. App Layer (Next.js)

  • Service Layer: /packages/app/src/services/processing/
  • base.ts: Core processing functions and processor registry (CRUD operations only)
  • server.ts: Server-side operations with AI integrations and external services
  • client.ts: Client-side operations for React components
  • actions.ts: Server Actions for Next.js App Router
  • processors/transactionProcessors.ts: Transaction-specific processors

Service Layer Architecture: Services follow a strict layered architecture where:

  • Base layer handles pure business logic and database operations
  • Server layer manages external service integrations (AI, geocoding, etc.)
  • See docs/development/SERVICE_PATTERNS.md for detailed patterns

  • API Routes: /packages/app/src/app/api/ai-processing/

  • callback/route.ts: Process step results and route to processors
  • start-job/route.ts: Initiate processing jobs

  • UI Components:

  • ProcessingStatusIndicator: Visual status display
  • Integration with existing transaction/budget/schedule UI

2. AI Hub (Genkit Service)

  • Location: /packages/aiHub/
  • Purpose: Executes AI processing tasks
  • Key Features:
  • OCR document processing
  • Document classification
  • Data extraction with structured output
  • Budget linking and validation
  • Vendor matching

3. Database Layer

  • Templates: Define processing workflows
  • Jobs: Track processing instances
  • Steps: Individual processing tasks
  • Integration: Links to existing entities (transactions, budgets, etc.)

Processing Flow

Processing Flow Orchestration

The AI processing system uses a two-tier function architecture to manage the flow from initial request to AI Hub execution:

High-Level Orchestration: createJobAndStartProcessing

Purpose: Entry point for AI processing that handles all setup and orchestration.

Location: packages/app/src/services/processing/server.ts

Responsibilities:

  1. Validates the processable type
  2. Creates the AI processing job in the database
  3. Routes to type-specific processing based on entity type
  4. Fetches required data (e.g., project ID for transactions)
  5. Calls the appropriate service's processing function

Usage Example:

// From WhatsApp bot or UI
const result = await createJobAndStartProcessing(
  PROCESSABLE_TYPE.TRANSACTIONS,
  transactionId,
  userId,
  'Receipt Processing',
  { uploadedVia: 'whatsapp' },
  'application/pdf'
);

Call Flow:

WhatsApp/UI Upload
    ↓
createJobAndStartProcessing (creates job)
    ↓
Type-specific router (e.g., startTransactionProcessing)
    ↓
sendFirstGroupToAIHub (initiates AI Hub processing)

Low-Level Execution: sendFirstGroupToAIHub

Purpose: Sends the first execution group of steps to AI Hub after setup is complete.

Location: packages/app/src/services/processing/base.ts

Responsibilities:

  1. Retrieves job steps from database
  2. Prepares the first execution group
  3. Resolves prompts with entity data
  4. Sends initial request to AI Hub
  5. Updates job status to IN_PROGRESS

Requirements:

  • Job must already exist in database
  • All entity data must be provided
  • Storage path and file metadata required

Usage Example:

// Called by service-specific processing functions
const success = await sendFirstGroupToAIHub(
  PROCESSABLE_TYPE.TRANSACTIONS,
  transactionId,
  transactionEntity, // Full entity with relations
  TASK_TYPE.TRANSACTION_PROCESS,
  storagePath,
  mimeType,
  jobId, // Must already exist
  userId,
  projectId,
  sendToAIHub // Callback to AI Hub
);

Key Differences

Aspect createJobAndStartProcessing sendFirstGroupToAIHub
Purpose Orchestration & Setup AI Hub Communication
Job Creation Creates new job Expects existing job
Entity Data Fetches from database Must be provided
Type Routing Routes by entity type Generic for all types
Abstraction Level High (business logic) Low (technical execution)
Entry Point External APIs/Actions Internal service calls
Error Handling User-friendly messages Technical logging

1. Job Creation

// Example: Create processing job for a transaction
const job = await createProcessingJob(
  PROCESSABLE_TYPE.TRANSACTIONS,
  transactionId,
  'Transaction Import Processing',
  { fileUrl, mimeType }
);

2. Template Structure

Templates define the processing workflow:

-- Example: Transaction Import Processing Template
Template
├── OCR Step (Group 1, Order 1)
├── Classification Step (Group 2, Order 1, Depends on: OCR)
├── Data Extraction Step (Group 3, Order 1, Depends on: Classification)
├── Budget Linking Step (Group 4, Order 1, Depends on: Data Extraction, Optional)
├── Vendor Matching Step (Group 4, Order 2, Depends on: Data Extraction, Optional)
└── Finish Processing Step (Final Group, MANDATORY for all templates)

Mandatory Finish Processing Step

Important: Every processing template MUST include a FinishProcessing step as the final step. This is a base step type that:

  • Applies to all task types (transactions, scripts, scenes, schedules, etc.)
  • Runs after all other steps complete successfully
  • Triggers final status updates in the application
  • Does NOT require AI processing - it's a system step
  • Enables task-specific completion logic through processor methods

Example configuration:

-- In any template's step configuration
INSERT INTO process_template_steps (
  process_template_id,
  step_key,
  display_name,
  execution_group,
  depends_on_steps,
  ai_hub_step_type,
  processor_method
) VALUES (
  template_id,
  'finish_processing',
  'Finish Processing',
  5, -- Last execution group
  ARRAY['all_previous_steps'], -- Depends on all prior steps
  'FinishProcessing',
  'finishTransactionProcessing' -- Task-specific finish method
);

3. Step Execution

Execution Group-Based Processing

The system uses an execution group-based callback mechanism to ensure dependent steps always receive fresh entity data:

  1. Group-by-Group Execution: Steps are processed in execution groups sequentially
  2. Fresh Entity Data: After each group completes, a callback is sent to AI Hub with:
  3. Updated entity data reflecting all completed steps' changes
  4. Only the steps for the next execution group
  5. Dependency Resolution: This ensures steps that depend on earlier steps see their database updates

Processing Flow

  1. Initial Send: First execution group steps are sent to AI Hub with initial entity data
  2. Group Completion: When all steps in a group complete (via webhook callbacks):
  3. System fetches fresh entity data from database
  4. Identifies next execution group's ready steps
  5. Sends callback to AI Hub with updated entity and next steps
  6. Parallel Execution: Steps within same group execute concurrently
  7. Result Storage: Each step stores results for use by subsequent steps
  8. Status Updates: Real-time progress tracking via webhooks
  9. Finish Processing: Mandatory final step runs to complete the job
  10. Completion Check: Job marked complete when finish step succeeds

Example Flow

Execution Group 1: OCR Extraction
  ↓ (completes, updates database)
Callback with fresh entity → AI Hub
  ↓
Execution Group 2: Classification (sees OCR results)
  ↓ (completes, updates database)
Callback with fresh entity → AI Hub
  ↓
Execution Group 3: Data Extraction (sees classification)
  ↓ (completes, updates database)
Callback with fresh entity → AI Hub
  ↓
Execution Group 4: Budget & Vendor Matching (parallel, both see extraction results)

4. Status Flow

PENDING → IN_PROGRESS → COMPLETED
                    ↘
                      → FAILED
                      → PARTIAL_SUCCESS

Database Schema

Core Tables

  1. process_templates: Processing workflow definitions
  2. service_name: Optional service name for processor routing
  3. process_template_steps: Individual step definitions
  4. process_template_id: Reference to template
  5. processor_method: Optional method name for dynamic processor invocation
  6. on_failure_action: Defines behavior when step fails (fail_job, skip_dependents, continue)
  7. processing_jobs: Job instances
  8. process_template_id: Reference to template
  9. processing_job_steps: Step execution tracking
  10. processing_job_id: Reference to job
  11. process_template_step_id: Reference to template step
  12. processing_issues: Issues requiring human resolution
  13. processable_type: Type of entity with issue
  14. processable_id: ID of entity with issue
  15. processing_job_id: Optional reference to job
  16. processing_job_step_id: Optional reference to step
  17. ai_prompts: Reusable AI prompts with variable substitution

Key Relationships

process_templates
    ↓ (1:N)
process_template_steps → ai_prompts (optional)
    ↓ (1:N via job creation)
processing_jobs
    ↓ (1:N)
processing_job_steps
    ↓ (1:N)
processing_issues (optional, for manual validation)

Service Architecture

Client-Side Services

// packages/app/src/services/processing/client.ts

// Get job status with polling
const status = await getJobStatusWithPolling(jobId);

// Check if entity has active job
const hasActive = await hasActiveProcessingJob(
  PROCESSABLE_TYPE.TRANSACTIONS,
  transactionId
);

Server-Side Services

// packages/app/src/services/processing/server.ts

// High-level orchestration - creates job and starts processing
const result = await createJobAndStartProcessing(
  processableType,
  processableId,
  userId,
  title,
  metadata,
  fileType
);

// Low-level - send first group to AI Hub (job must exist)
const success = await sendFirstGroupToAIHub(
  processableType,
  processableId,
  entity,
  taskType,
  storagePath,
  mimeType,
  jobId,
  userId,
  projectId,
  sendToAIHub
);

// Update step status from webhook
await updateStepStatus(jobId, stepKey, status, resultData, errorMessage);

Transaction Service Integration

// packages/app/src/services/transaction/base.ts

// Start transaction processing
const success = await startTransactionProcessing(
  transactionId,
  fileDataUrl,
  mimeType,
  jobId,
  sendToAIHub
);

Execution Group-Based Callback Architecture

Overview

The system implements an execution group-based callback mechanism that ensures each processing step works with the most current database state. This architecture provides data consistency across dependent steps while maintaining parallel processing efficiency.

Core Design Principles

  1. Execution Groups: Steps are organized into sequential execution groups based on dependencies
  2. Group-by-Group Processing: Each execution group is processed sequentially
  3. Fresh Data Callbacks: After each group completes, fresh entity data is fetched from the database and sent with the next group
  4. Parallel Within Groups: Steps within the same execution group run concurrently for optimal performance

Implementation Details

// When execution group completes
async function onExecutionGroupComplete(jobId: string) {
  // 1. Check if all steps in current group are done
  const currentGroupComplete = await checkGroupCompletion(jobId);

  if (currentGroupComplete) {
    // 2. Fetch fresh entity data (includes all DB changes)
    const freshEntity = await getEntityByJobId(jobId);

    // 3. Get next execution group steps
    const nextSteps = await getNextExecutionGroupSteps(jobId);

    if (nextSteps.length > 0) {
      // 4. Send callback to AI Hub with fresh data
      await sendToAIHub({
        entity: freshEntity, // Fresh data with all updates
        metadata: { steps: nextSteps },
      });
    }
  }
}

Key Benefits

  1. Data Consistency: Dependent steps always work with the latest database state
  2. Simplified Architecture: Clean separation between execution groups
  3. Optimized Communication: Only relevant steps sent per callback to AI Hub
  4. Parallel Performance: Steps within the same group execute concurrently
  5. Clear Dependencies: Execution groups make step dependencies explicit and manageable

Dynamic Processor System

The dynamic processor system automatically routes AI processing results to appropriate service methods, eliminating the need for switch statements and making the system easily extensible.

Architecture

// Processor Registry
const processorRegistry: ProcessorRegistry = {
  transaction: {
    processDocumentParsing,
    processOcrExtraction,
    processProjectRelationshipMatching,
    processBudgetItemMatching,
    processJustificationAnalysis,
  },
  // Add more services and processors as needed
};

How It Works

  1. Template Configuration: Each template step can specify:
  2. processor_method: The method to call for processing results
  3. Template service_name: The service containing the processors

  4. Automatic Routing: When a step completes, the system:

// In processStepResult function
const processor = getProcessor(
  template.service_name,
  step.template_step.processor_method
);
if (processor) {
  const success = await processor(step);
}
  1. Processor Implementation: Each processor follows a standard interface:
type ProcessorFunction = (
  step: ProcessingJobStepWithRelation
) => Promise<boolean>;

Adding New Processors

  1. Create Processor Function:
export async function processNewStepType(
  step: ProcessingJobStepWithRelation
): Promise<boolean> {
  const { result_data, processing_job_id } = step;
  if (!result_data) return true;

  // Process the results
  // Update relevant tables

  return true;
}
  1. Register in Service:
// In transactionProcessors.ts
export const transactionProcessors = {
  processDocumentParsing,
  processOcrExtraction,
  processNewStepType, // Add new processor
};
  1. Configure in Database:
-- In template step configuration
UPDATE process_template_steps
SET processor_method = 'processNewStepType'
WHERE step_key = 'new_step_type';

Benefits

  • No Code Changes: Add new processors without modifying core logic
  • Type Safety: Full TypeScript support for processor functions
  • Testability: Each processor is independently testable
  • Maintainability: Clear separation of concerns
  • Extensibility: Easy to add new entity types and processors

Example: Transaction Processing with Enhanced Address Matching

// Transaction processors with AI-enhanced operations
export const transactionProcessors = {
  // Document parsing extracts type and metadata
  processDocumentParsing: async (step) => {
    // Updates: document_type, handwritten_notes, etc.
  },

  // OCR extraction with address creation
  processOcrExtraction: async (step) => {
    const { result_data } = step;

    // Parse AI response from message field
    const aiResponse = JSON.parse(result_data.message);

    // Create transaction using base layer
    const transaction = await createTransactionBase(aiResponse);

    // Enhanced address creation with AI matching (server layer)
    if (aiResponse.address) {
      const address = await createAddressWithAIMatching({
        ...aiResponse.address,
        transactionId: transaction.id,
      });

      // Update transaction with matched/created address
      await updateTransaction(transaction.id, {
        address_id: address.id,
      });
    }

    // Return full transaction for context
    return transaction;
  },

  // Project matching links to projects
  processProjectRelationshipMatching: async (step) => {
    // Updates: project_relationship_id, type, etc.
  },

  // Finish processing - MANDATORY final step
  finishTransactionProcessing: async (step) => {
    // Updates transaction status to 'Ready'
    // Performs any final cleanup or notifications
    // This runs after ALL other steps complete successfully
  },

  // Budget linking for line items
  processBudgetItemMatching: async (step) => {
    // Updates: line item budget associations
  },

  // Justification analysis
  processJustificationAnalysis: async (step) => {
    // Updates: ai_justification field
  },
};

Enhanced Address Matching with AI

The system implements intelligent address deduplication using AI-powered semantic matching to prevent duplicate addresses and improve data quality.

Architecture

Address creation follows a multi-layer approach:

  1. Transaction Service (server): Initiates address creation from OCR results
  2. Address Service (server): Handles AI matching and geocoding
  3. Address Service (base): Performs pure CRUD operations

AI Matching Process

// packages/app/src/services/address/server.ts (example integration)
export async function createAddressWithAIMatching(
  addressData: AddressCreateData
): Promise<ServiceResponse<Address>> {
  // 1. Fetch existing addresses for comparison
  const existingAddresses = await findExistingAddresses();

  // 2. Use AI to find semantic matches
  const prompt = await resolvePromptWithVariables({
    promptIdentifier: { name: 'AddressMatchingPrompt' },
    variables: {
      entity: { newAddress: addressData, existingAddresses },
      entityId: addressData.transactionId,
      entityType: 'address',
    },
  });

  const aiMatch = await aiHubClient.findMatch(prompt);

  // 3. Return existing if match found
  if (aiMatch?.matchId) {
    return findAddressById(aiMatch.matchId);
  }

  // 4. Geocode new address
  const geocoded = await geocodeAddress(addressData);

  // 5. Create new address
  return createAddressBase({ ...addressData, ...geocoded });
}

Benefits

  • Deduplication: Prevents duplicate addresses with different formatting
  • Data Quality: Maintains clean, consistent address database
  • Smart Matching: Uses semantic understanding, not just string comparison
  • Geocoding Integration: Enriches addresses with geographic data

Integration Points

1. WhatsApp Bot (Sana)

  • Creates processing jobs when documents are uploaded
  • Provides status updates during processing
  • Displays results when complete

2. Web Application

  • Shows processing status in transaction/budget/schedule views
  • Allows manual retry of failed jobs
  • Displays extracted data for review

3. AI Hub Communication

  • Initial Request: App → SQS → AI Hub (first execution group only)
  • Group Completion Callbacks: App → AI Hub (subsequent groups with fresh entity data)
  • Status Updates: AI Hub → Webhook → App (for each step completion)
  • File Transfer: Via S3 presigned URLs
  • Prompt Resolution: Dynamic variable substitution in prompts using @delta/common

Callback Mechanism

The callback system ensures data consistency by sending fresh entity data with each execution group:

Callback Structure:

{
  type: 'TRANSACTION_PROCESS',
  entityId: transactionId,
  entity: freshEntityData, // Fetched after group completion
  metadata: {
    steps: nextGroupSteps,  // Only next group's steps
    jobId: jobId,
    executionGroup: nextGroup
  }
}

Key Features:

  • Fresh entity data fetched from database after each group completes
  • Only the next execution group's steps are sent
  • Maintains execution group parallelism
  • Ensures data consistency across the processing pipeline

Prompt Resolution System

The system provides flexible prompt resolution with dynamic variable substitution, supporting both prompt IDs and names for maximum flexibility.

Prompt Identifier Pattern

Prompts can be resolved using either:

  • Prompt ID: Direct UUID reference for performance
  • Prompt Name: Human-readable identifier for maintainability
// Resolution by ID
const prompt = await resolvePromptWithVariables({
  promptIdentifier: { id: 'uuid-123' },
  variables: { entity, entityId, entityType },
});

// Resolution by name
const prompt = await resolvePromptWithVariables({
  promptIdentifier: { name: 'TransactionOCRExtraction' },
  variables: { entity, entityId, entityType },
});

Server-Side Prompt Resolution

The server layer provides a wrapper for secure prompt resolution:

// packages/app/src/services/aiPrompt/server.ts (example)
export async function resolvePromptWithVariables(
  promptIdentifier: { name?: string; id?: string },
  variables: PromptVariables
): Promise<ServiceResponse<string>> {
  // Fetches prompt from database
  // Resolves variables using template engine
  // Returns processed prompt text
}

Variable Structure

Variables follow the PromptVariables type:

interface PromptVariables {
  entity: Record<string, any>; // The main entity data
  entityId: string; // Entity identifier
  entityType: string; // Entity type (transaction, budget, etc.)
}

Template Variable Substitution

The system uses the resolveTemplate function from @delta/common for variable substitution. This allows prompts to access entity data and format it appropriately for AI processing.

Variable Syntax

Single Field Access

Access simple fields using dot notation:

{!entity.field}
{!entity.nested.field}

Example:

Template: "Process transaction {!entity.transaction_code} from {!entity.submission_address}"
Entity: { transaction_code: "TXN-001", submission_address: "john@example.com" }
Result: "Process transaction TXN-001 from john@example.com"

Array Formatting

Extract and format specific fields from arrays:

{!Array:path.to.array|field1,field2,field3}

Example:

Template: "Tax rates: {!Array:entity.project.tax_scheme.tax_rates|id,label,description,percentage}"
Entity: {
  project: {
    tax_scheme: {
      tax_rates: [
        { id: "uuid-123", label: "Standard", description: "Standard VAT", percentage: 20 },
        { id: "uuid-456", label: "Reduced", description: "Reduced VAT", percentage: 5 }
      ]
    }
  }
}
Result: "Tax rates: id: uuid-123, label: Standard, description: Standard VAT, percentage: 20, id: uuid-456, label: Reduced, description: Reduced VAT, percentage: 5"

Object Field Extraction

Extract specific fields from an object:

{!Object:path.to.object|field1,field2}

Example:

Template: "Project details: {!Object:entity.project|id,title,status}"
Entity: {
  project: {
    id: "proj-001",
    title: "Summer Campaign",
    status: "active",
    created_at: "2024-01-01",
    budget: 50000
  }
}
Result: "Project details: id: proj-001, title: Summer Campaign, status: active"

Real-World Usage in Prompts

Document Parsing Prompt

-- In ai_prompts table
'Analyze this document for the following:
1. Document type classification
2. Tax eligibility based on: {!entity.project.tax_scheme.rules}
3. Submit any found codes from the document'

OCR Extraction Prompt

-- In ai_prompts table
'Extract transaction details from this receipt.
Valid tax rates for line items: {!Array:entity.project.tax_scheme.tax_rates|id,label,description,percentage}
Please use the tax rate ID from the list above for each line item.'

Project Matching Prompt

-- In ai_prompts table
'Match this transaction to the appropriate project.
Available projects: {!Array:entity.available_projects|id,project_code,title}
Current project context: {!Object:entity.project|id,title,type}'

Implementation Details

  1. Variable Resolution: Happens in packages/app/src/services/processing/base.ts during prompt preparation
  2. Entity Context: The entity object contains the full entity with all relations (e.g., TransactionWithRelations)
  3. Null Handling: Missing or null values resolve to empty strings
  4. Type Safety: All values are converted to strings for prompt injection
  5. Generic Design: Works with any entity type and field structure

Status Tracking

Job Status

  • PENDING: Job created, waiting to start
  • IN_PROGRESS: At least one step is running
  • COMPLETED: All steps finished successfully
  • FAILED: All steps failed or critical step failed
  • PARTIAL_SUCCESS: Some optional steps failed

Step Status

  • PENDING: Step created, dependencies not met
  • READY: Dependencies satisfied, ready to execute
  • IN_PROGRESS: Currently executing
  • COMPLETED: Finished successfully
  • FAILED: Execution failed
  • SKIPPED: Condition not met or dependency failed

Progress Calculation

-- Database function: calculate_job_progress
Progress = (Completed Steps / Total Steps) * 100

Error Handling

Step-Level Errors

  • Error message stored in error_message field
  • Optional steps don't block job completion
  • Failed dependencies cause dependent steps to be skipped
  • Steps can define failure action behavior via on_failure_action field

Failure Action Types

The on_failure_action field in ai_process_template_steps determines how the system responds when a step fails:

  1. fail_job (Default)
  2. The entire job is marked as FAILED
  3. All dependent steps are automatically skipped
  4. Use for critical steps where failure means the entire process cannot continue
  5. Example: OCR extraction failing means no data to process

  6. skip_dependents

  7. The job continues running
  8. Only steps that directly depend on this step are skipped
  9. Other parallel or independent steps continue normally
  10. Use for optional enrichment steps that have specific dependents
  11. Example: Budget matching fails, so budget validation is skipped, but vendor matching continues

  12. continue

  13. The job continues as if the step succeeded
  14. Dependent steps still run normally
  15. The step is marked as FAILED but doesn't impact flow
  16. Use for completely optional steps that don't affect downstream processing
  17. Example: Sending a notification fails but shouldn't stop processing

Retry Configuration

The on_retry_action field in ai_process_template_steps allows fine-grained control over retry behavior:

Model Switching

Configure automatic model switching on retries to handle model-specific failures:

{
  "switch_models": ["claude-3-opus", "gpt-4-turbo"]
}

Behavior: Models cycle through the array plus the original model. With 5 max retries:

  • Initial attempt: Original model from model_config
  • Retry 1: claude-3-opus
  • Retry 2: gpt-4-turbo
  • Retry 3: Original model
  • Retry 4: claude-3-opus
  • Retry 5: gpt-4-turbo

Max Tokens Adjustment

Increase max output tokens on each retry to handle truncated JSON responses:

{
  "raise_max_tokens_by": 2000
}

Behavior: Max tokens increases by the specified amount per retry (no cap):

  • Initial: 8000 (from config)
  • Retry 1: 10000
  • Retry 2: 12000
  • Retry 3: 14000
  • Retry 4: 16000

Combined Configuration

Both strategies can be used together:

{
  "switch_models": ["claude-3-opus"],
  "raise_max_tokens_by": 2000
}

This provides flexibility for handling different failure scenarios.

Job-Level Errors

  • Job marked as FAILED if any step with fail_job action fails
  • PARTIAL_SUCCESS if some optional steps fail but critical steps succeed
  • Retry capability at job or step level with configurable retry actions

Webhook Resilience

  • Idempotent status updates
  • Handles out-of-order updates
  • Validates job/step existence before updates

Security Considerations

Authentication

  • Service role for system operations
  • User authentication for job creation
  • Row-Level Security on all tables

Data Access

  • Users can only view their own jobs
  • Organization-based access control
  • Audit trail via created_by fields

File Security

  • S3 presigned URLs with expiration
  • Encrypted file transfer
  • No permanent storage of sensitive documents

Key Architecture Principles

Service Layer Separation

The system strictly enforces separation between service layers:

  • Base Services: Pure business logic, no external calls
  • Server Services: External integrations (AI, geocoding, email)
  • Client Services: React component data fetching
  • Sana Services: WhatsApp bot specific logic

For detailed patterns, see docs/development/SERVICE_PATTERNS.md

AI Response Parsing

AI responses are consistently parsed from the message field:

// Correct parsing pattern
const aiResponse = JSON.parse(result_data.message);
// NOT: JSON.parse(result_data.response)

Idempotent Design

All processing steps are designed to be idempotent:

  • Check completion status before processing
  • Return success with skipped: true for completed steps
  • Atomic database updates prevent partial state

Future Enhancements

1. Batch Processing

  • Process multiple documents in single job
  • Bulk status updates
  • Parallel document processing

2. Custom Workflows

  • User-defined templates
  • Drag-and-drop workflow builder
  • Custom AI prompts per organization (partially implemented with prompt templates)

3. Advanced Analytics

  • Processing time metrics
  • Success rate tracking
  • Cost optimization insights

4. Integration Expansion

  • Email document ingestion
  • Direct API uploads
  • Third-party system webhooks

5. Enhanced Processing Features

  • Conditional processor selection based on result data
  • Multi-step transactions with rollback support
  • Real-time processing progress via WebSockets
  • Custom error recovery strategies per step type

Implementation Examples

Creating a Processing Job

// In WhatsApp bot handler
const job = await createTransactionProcessingJob(transactionId, {
  fileUrl: s3Url,
  mimeType: 'application/pdf',
  uploadedBy: userId,
});

// Start processing - only sends first execution group
await startTransactionProcessing(
  transactionId,
  fileUrl,
  mimeType,
  job.id,
  sendToAIHub // Callback that sends to AI Hub
);

// Subsequent groups sent automatically via callbacks
// as each execution group completes

Displaying Status

// In React component
const ProcessingStatus = ({ transactionId }) => {
  const { data: status } = useJobStatus(transactionId);

  return (
    <ProcessingStatusIndicator
      job={status?.job}
      steps={status?.steps}
      onRetry={handleRetry}
    />
  );
};

Webhook Handler

// API route handler for status updates from AI Hub
export async function POST(request: Request) {
  const { jobId, stepKey, status, resultData } = await request.json();

  // Update step status in database
  const success = await updateProcessingStepStatus(
    jobId,
    stepKey,
    status,
    resultData
  );

  // Check if execution group is complete
  if (success && status === 'COMPLETED') {
    await checkAndSendNextGroup(jobId);
  }

  return Response.json({ success });
}

Callback Handler with Idempotency

// API route handler with duplicate processing prevention
export async function POST(request: Request) {
  const { jobId, stepKey, status, resultData, errorMessage } =
    await request.json();

  // Check if step is already completed (idempotency)
  const stepStatus = await getStepStatus(jobId, stepKey);
  if (stepStatus === PROCESSING_STEP_STATUS.COMPLETED) {
    return Response.json({
      success: true,
      skipped: true,
      message: 'Step already completed',
    });
  }

  // Update step status
  const updated = await updateProcessingStepStatus(
    jobId,
    stepKey,
    status,
    resultData,
    errorMessage
  );

  if (!updated) {
    return Response.json({ success: false, error: 'Failed to update status' });
  }

  // Process results if step completed successfully
  if (status === PROCESSING_STEP_STATUS.COMPLETED && resultData) {
    const processed = await processStepResult(jobId, stepKey);

    // Check if current execution group is complete
    const groupComplete = await isExecutionGroupComplete(jobId, stepKey);

    if (groupComplete) {
      // Fetch fresh entity data
      const freshEntity = await getEntityByJobId(jobId);

      // Get next execution group steps
      const nextSteps = await getNextExecutionGroupSteps(jobId);

      if (nextSteps.length > 0) {
        // Send callback to AI Hub with fresh data
        await sendToAIHub({
          type: getTaskType(jobId),
          entityId: getEntityId(jobId),
          entity: freshEntity,
          metadata: {
            steps: nextSteps,
            jobId,
            executionGroup: nextSteps[0].execution_group,
          },
        });
      }
    }

    return Response.json({ success: processed });
  }

  return Response.json({ success: true });
}

Idempotency and Duplicate Processing Prevention

The system implements comprehensive idempotency to handle duplicate messages from SQS and ensure data integrity.

Key Features

  1. Step Status Checking: Before processing, the system checks if a step is already completed
  2. Atomic Updates: Database operations use transactions to prevent partial updates
  3. Message Deduplication: Handles SQS message redelivery gracefully
  4. Skipped Response: Returns skipped: true flag for already-processed steps

Implementation

// Check step status before processing
export async function getStepStatus(
  jobId: string,
  stepKey: string
): Promise<string | null> {
  const step = await findOne(DatabaseTable.PROCESSING_JOB_STEPS, {
    processing_job_id: jobId,
    step_key: stepKey,
  });
  return step?.status || null;
}

// Idempotent processor
export async function processStep(step: ProcessingJobStep) {
  // Check if already processed
  if (step.status === PROCESSING_STEP_STATUS.COMPLETED) {
    return { success: true, skipped: true };
  }

  // Process the step
  const result = await performProcessing(step);

  // Atomic status update
  await updateStepStatus(step.id, PROCESSING_STEP_STATUS.COMPLETED);

  return { success: true, skipped: false };
}

Benefits

  • Data Integrity: Prevents duplicate processing and data corruption
  • Resilience: Handles network failures and message redelivery
  • Auditability: Clear tracking of skipped vs processed steps
  • Performance: Avoids unnecessary reprocessing

Conclusion

The AI Processing Architecture provides a robust, scalable foundation for intelligent document processing in FarmCove Delta. Its template-based approach allows easy extension to new entity types and processing workflows while maintaining consistency and reliability.