Constraints
Constraints API
Section titled “Constraints API”Constraints in EvidentSource provide optimistic concurrency control and consistency guarantees for batch transactions. They ensure database state meets expectations before committing events.
Overview
Section titled “Overview”Constraints are conditions that must be satisfied for a batch to commit:
- Checked atomically with the batch transaction
- Provide optimistic concurrency control
- Enable ordering guarantees
- Support complex consistency requirements
Constraint Types
Section titled “Constraint Types”MinRevision Constraint
Section titled “MinRevision Constraint”Ensures the database (or specific selector) hasn’t changed since a known revision.
Use Cases
Section titled “Use Cases”- Read-modify-write patterns
- Preventing lost updates
- Ensuring sequential processing
Examples
Section titled “Examples”// Database-wide constraintBatchConstraint::MinRevision(42)
// Stream-specific constraintBatchConstraint::MinRevisionForSelector( 100, EventSelector::Stream("order-service"))gRPC Definition
Section titled “gRPC Definition”message MinRevisionConstraint { uint64 revision = 1; optional EventSelector selector = 2;}Behavior
Section titled “Behavior”- If
selectoris omitted: Database must be at exactly the specified revision - If
selectoris provided: Only events matching the selector must be at the revision - Fails with
FAILED_PRECONDITIONif constraint is violated
MaxRevision Constraint
Section titled “MaxRevision Constraint”Ensures no events exist beyond a specified revision for a selector.
Use Cases
Section titled “Use Cases”- Maintaining stream order
- Preventing duplicate processing
- Implementing exactly-once semantics
- Ensuring append-only patterns
Examples
Section titled “Examples”// Ensure stream hasn't progressed beyond revision 50BatchConstraint::MaxRevisionForSelector( 50, EventSelector::Stream("payment-service"))
// Ensure subject has no events after revision 100BatchConstraint::MaxRevisionForSelector( 100, EventSelector::Subject("order-123"))Behavior
Section titled “Behavior”- Always requires a selector (no database-wide max revision)
- Checks if any matching events exist after the specified revision
- Useful for ensuring ordering within streams or subjects
RevisionRange Constraint
Section titled “RevisionRange Constraint”Ensures a specific range of revisions exists in the database.
Use Cases
Section titled “Use Cases”- Dependency checking
- Ensuring prerequisite events exist
- Maintaining causal ordering
- Validating event chains
Examples
Section titled “Examples”// Ensure revisions 10-20 existBatchConstraint::RevisionRange(10, 20, None)
// Ensure specific stream has events in rangeBatchConstraint::RevisionRange( 10, 20, Some(EventSelector::Stream("inventory-service")))Behavior
Section titled “Behavior”- Verifies all revisions in the range exist
- Can be scoped to specific selectors
- Useful for ensuring dependencies are met
Complex Constraint Patterns
Section titled “Complex Constraint Patterns”Optimistic Locking
Section titled “Optimistic Locking”Implement optimistic locking for aggregate updates:
// Load aggregatelet events = query_aggregate_events("order-123").await?;let last_revision = events.last().map(|e| e.revision()).unwrap_or(0);
// Process commandlet new_events = process_command(command, events)?;
// Save with concurrency checklet constraints = vec![ BatchConstraint::MaxRevisionForSelector( last_revision, EventSelector::Subject("order-123") )];
app.transact_batch(batch_id, "my-events", new_events, constraints).await?;Distributed Saga Coordination
Section titled “Distributed Saga Coordination”Ensure saga steps execute in order:
// Saga step 2 requires step 1 completionlet constraints = vec![ // Step 1 must have completed BatchConstraint::MinRevisionForSelector( step1_revision, EventSelector::new() .with_subject("saga-123") .with_type("Step1Completed") ), // No other steps have executed BatchConstraint::MaxRevisionForSelector( step1_revision, EventSelector::Subject("saga-123") ),];Exactly-Once Processing
Section titled “Exactly-Once Processing”Prevent duplicate event processing:
// Track processed message IDslet constraints = vec![ // Ensure this message hasn't been processed BatchConstraint::MaxRevisionForSelector( 0, // No events should exist EventSelector::Subject(format!("message-{}", message_id)) ),];
let events = vec![ ProposedEvent::new( format!("message-{}", message_id), "message-processor", "MessageProcessed", json!({"processed_at": Utc::now()}) ), // ... actual business events];
app.transact_batch(batch_id, "my-events", events, constraints).await?;Conditional Writes
Section titled “Conditional Writes”Write only if conditions are met:
// Only create if doesn't existlet constraints = vec![ BatchConstraint::MaxRevisionForSelector( 0, EventSelector::Subject("user-email@example.com") ),];
// Only update if existslet constraints = vec![ BatchConstraint::MinRevisionForSelector( 1, // At least one event must exist EventSelector::Subject("user-123") ),];Constraint Evaluation
Section titled “Constraint Evaluation”Evaluation Order
Section titled “Evaluation Order”- All constraints are evaluated before any events are written
- Constraints see the same database state (snapshot isolation)
- If any constraint fails, no events are written
- Constraint checks are atomic with the batch transaction
Performance Considerations
Section titled “Performance Considerations”// Efficient: Single constraint checklet constraints = vec![ BatchConstraint::MinRevision(database_revision),];
// Less efficient: Multiple selector-based constraintslet constraints = vec![ BatchConstraint::MinRevisionForSelector(10, selector1), BatchConstraint::MinRevisionForSelector(20, selector2), BatchConstraint::MinRevisionForSelector(30, selector3),];
// Optimize by combining when possiblelet constraints = vec![ BatchConstraint::MinRevision(database_revision), BatchConstraint::MaxRevisionForSelector( last_stream_revision, EventSelector::Stream("critical-stream") ),];Error Handling
Section titled “Error Handling”Constraint Violations
Section titled “Constraint Violations”use evidentsource_api::BatchTransactionError;
match app.transact_batch(batch_id, "my-events", events, constraints).await { Ok(batch) => { println!("Success at revision {}", batch.0.revision()); } Err(BatchTransactionError::ConstraintViolation(violation)) => { match violation { ConstraintViolation::MinRevisionNotMet { expected, actual } => { eprintln!("Database changed: expected {}, got {}", expected, actual); } ConstraintViolation::MaxRevisionExceeded { max, found } => { eprintln!("Events exist beyond {}: found {}", max, found); } ConstraintViolation::RangeNotSatisfied { start, end } => { eprintln!("Missing events in range {}-{}", start, end); } } } Err(e) => eprintln!("Transaction failed: {}", e),}Retry Strategies
Section titled “Retry Strategies”/// Retry with exponential backoff on constraint violationsasync fn transact_with_retry( app: &Application<_, _, _, _, _, _, _>, batch_id: Uuid, database: &str, events: Vec<ProposedEvent>, max_retries: u32,) -> Result<BatchDetail, BatchTransactionError> { let mut retries = 0; let mut backoff = Duration::from_millis(10);
loop { // Get current state let database_state = app.latest_database(database).await?; let constraints = vec![ BatchConstraint::MinRevision(database_state.revision()), ];
match app.transact_batch(batch_id, database, events.clone(), constraints).await { Ok(result) => return Ok(result.0), Err(BatchTransactionError::ConstraintViolation(_)) if retries < max_retries => { retries += 1; tokio::time::sleep(backoff).await; backoff *= 2; // Exponential backoff continue; } Err(e) => return Err(e), } }}Best Practices
Section titled “Best Practices”1. Choose Appropriate Constraints
Section titled “1. Choose Appropriate Constraints”// For user commands: Check aggregate hasn't changedBatchConstraint::MaxRevisionForSelector( last_revision, EventSelector::Subject(aggregate_id))
// For system processes: Check database stateBatchConstraint::MinRevision(expected_revision)
// For idempotent operations: Check hasn't been doneBatchConstraint::MaxRevisionForSelector( 0, EventSelector::Subject(operation_id))2. Minimize Constraint Scope
Section titled “2. Minimize Constraint Scope”// Good: Specific selectorBatchConstraint::MaxRevisionForSelector( 100, EventSelector::Subject("order-123"))
// Avoid: Database-wide constraint when not neededBatchConstraint::MinRevision(database_revision) // Blocks all concurrent writes3. Handle Violations Gracefully
Section titled “3. Handle Violations Gracefully”enum RetryStrategy { Immediate, ExponentialBackoff, GiveUp,}
fn determine_retry_strategy(error: &BatchTransactionError) -> RetryStrategy { match error { BatchTransactionError::ConstraintViolation(_) => RetryStrategy::ExponentialBackoff, BatchTransactionError::InvalidBatch(_) => RetryStrategy::GiveUp, _ => RetryStrategy::Immediate, }}4. Use Constraints for Business Rules
Section titled “4. Use Constraints for Business Rules”// Enforce business invariantslet constraints = vec![ // Account balance never goes negative BatchConstraint::MinRevisionForSelector( 1, // Account must exist EventSelector::new() .with_subject(account_id) .with_type("AccountCreated") ), // No concurrent modifications BatchConstraint::MaxRevisionForSelector( last_balance_revision, EventSelector::new() .with_subject(account_id) .with_type("BalanceUpdated") ),];Advanced Patterns
Section titled “Advanced Patterns”Multi-Phase Commit
Section titled “Multi-Phase Commit”// Phase 1: Preparelet prepare_constraints = vec![ BatchConstraint::MaxRevisionForSelector( 0, EventSelector::Subject(format!("tx-{}-prepared", tx_id)) ),];
app.transact_batch( prepare_id, "my-events", vec![prepare_event], prepare_constraints).await?;
// Phase 2: Commit (after all participants prepared)let commit_constraints = vec![ BatchConstraint::MinRevisionForSelector( 1, EventSelector::Subject(format!("tx-{}-prepared", tx_id)) ), BatchConstraint::MaxRevisionForSelector( 0, EventSelector::Subject(format!("tx-{}-committed", tx_id)) ),];
app.transact_batch( commit_id, "my-events", vec![commit_event], commit_constraints).await?;Linearizable Reads
Section titled “Linearizable Reads”// Ensure read sees all writes up to a pointasync fn linearizable_read( app: &Application<_, _, _, _, _, _, _>, database: &str, write_revision: u64,) -> Vec<Event> { // Wait for database to catch up let database = app.await_database(database, write_revision).await?;
// Query at the awaited revision app.query_events(database, database.revision(), &query) .collect() .await}Next Steps
Section titled “Next Steps”- Understand Bi-Temporal Indexing for efficient queries
- Learn about Transactions patterns
- Explore State Changes for command validation
- Implement State Views for derived state