Constraints
Constraints API
Section titled “Constraints API”Constraints in EvidentSource provide optimistic concurrency control and consistency guarantees for transactions. They ensure database state meets expectations before committing events.
Overview
Section titled “Overview”Constraints are conditions that must be satisfied for a transaction to commit:
- Checked atomically with the transaction
- Provide optimistic concurrency control
- Enable ordering guarantees
- Support complex consistency requirements
Constraint Types
Section titled “Constraint Types”Min Constraint
Section titled “Min Constraint”Ensures a minimum number of matching events exist at or before a revision.
Use Cases
Section titled “Use Cases”- Read-modify-write patterns
- Preventing lost updates
- Ensuring sequential processing
Examples
Section titled “Examples”use evidentsource_core::domain::{AppendCondition, EventSelector};
// At least 42 events must match the selectorAppendCondition::Min(EventSelector::any(), 42)
// Stream-specific constraintAppendCondition::Min( EventSelector::stream_equals("order-service"), 100)gRPC Definition
Section titled “gRPC Definition”message MinRevisionConstraint { uint64 revision = 1; optional EventSelector selector = 2;}Behavior
Section titled “Behavior”- Checks that at least the specified number of events exist
- Can be scoped to specific selectors
- Fails with
FAILED_PRECONDITIONif constraint is violated
Max Constraint
Section titled “Max Constraint”Ensures no more than a specified number of events exist for a selector.
Use Cases
Section titled “Use Cases”- Maintaining stream order
- Preventing duplicate processing
- Implementing exactly-once semantics
- Ensuring append-only patterns
Examples
Section titled “Examples”use evidentsource_core::domain::{AppendCondition, EventSelector};
// Ensure stream hasn't progressed beyond revision 50AppendCondition::Max( EventSelector::stream_equals("payment-service"), 50)
// Ensure subject has no events (for uniqueness)AppendCondition::Max( EventSelector::subject_equals("order-123"), 0)
// Helper method for uniqueness checksAppendCondition::fail_if_events_match( EventSelector::subject_equals("order-123"))Behavior
Section titled “Behavior”- Requires a selector (scoped to specific events)
- Checks if any matching events exist beyond the specified count
- Useful for ensuring ordering within streams or subjects
Range Constraint
Section titled “Range Constraint”Ensures a specific count of events exists within bounds.
Use Cases
Section titled “Use Cases”- Dependency checking
- Ensuring prerequisite events exist
- Maintaining causal ordering
- Validating event chains
Examples
Section titled “Examples”use evidentsource_core::domain::{AppendCondition, EventSelector};
// Ensure exactly 10 events exist for selectorAppendCondition::at_revision( EventSelector::stream_equals("inventory-service"), 10)?Behavior
Section titled “Behavior”- Verifies event count is within the specified range
- Can be scoped to specific selectors
- Useful for ensuring dependencies are met
Complex Constraint Patterns
Section titled “Complex Constraint Patterns”Optimistic Locking
Section titled “Optimistic Locking”Implement optimistic locking for aggregate updates:
use evidentsource_core::domain::{AppendCondition, EventSelector};use nonempty::NonEmpty;
// Load aggregatelet db = conn.latest_database().await?;let events: Vec<_> = db.query_events( EventSelector::subject_equals("order-123"), QueryOptions::default()).collect().await;let last_revision = events.last().map(|e| e.revision()).unwrap_or(0);
// Process commandlet new_events = process_command(command, events)?;
// Save with concurrency checklet conditions = vec![ AppendCondition::unchanged_since( EventSelector::subject_equals("order-123"), last_revision )];
let result = conn.transact(NonEmpty::from_vec(new_events).unwrap(), conditions).await?;Distributed Saga Coordination
Section titled “Distributed Saga Coordination”Ensure saga steps execute in order:
use evidentsource_core::domain::{AppendCondition, EventSelector};
// Saga step 2 requires step 1 completionlet conditions = vec![ // Step 1 must have completed AppendCondition::must_exist( EventSelector::subject_equals("saga-123") .and(EventSelector::type_equals("Step1Completed")) ), // No other steps have executed beyond step 1 AppendCondition::Max( EventSelector::subject_equals("saga-123"), step1_revision ),];Exactly-Once Processing
Section titled “Exactly-Once Processing”Prevent duplicate event processing:
use evidentsource_core::domain::{AppendCondition, EventSelector};
// Track processed message IDs - ensure this message hasn't been processedlet conditions = vec![ AppendCondition::must_not_exist( EventSelector::subject_equals(format!("message-{}", message_id)) ),];
let events = NonEmpty::new( ProspectiveEvent::new( format!("message-{}", message_id), "message-processor", "MessageProcessed", json!({"processed_at": Utc::now()}) ));
let result = conn.transact(events, conditions).await?;Conditional Writes
Section titled “Conditional Writes”Write only if conditions are met:
use evidentsource_core::domain::{AppendCondition, EventSelector};
// Only create if doesn't existlet conditions = vec![ AppendCondition::must_not_exist( EventSelector::subject_equals("user-email@example.com") ),];
// Only update if existslet conditions = vec![ AppendCondition::must_exist( EventSelector::subject_equals("user-123") ),];Constraint Evaluation
Section titled “Constraint Evaluation”Evaluation Order
Section titled “Evaluation Order”- All constraints are evaluated before any events are written
- Constraints see the same database state (snapshot isolation)
- If any constraint fails, no events are written
- Constraint checks are atomic with the transaction
Performance Considerations
Section titled “Performance Considerations”use evidentsource_core::domain::{AppendCondition, EventSelector};
// Efficient: Single constraint checklet conditions = vec![ AppendCondition::Min(EventSelector::any(), database_revision),];
// Less efficient: Multiple selector-based constraintslet conditions = vec![ AppendCondition::Min(selector1, 10), AppendCondition::Min(selector2, 20), AppendCondition::Min(selector3, 30),];
// Optimize by combining when possiblelet conditions = vec![ AppendCondition::Min(EventSelector::any(), database_revision), AppendCondition::unchanged_since( EventSelector::stream_equals("critical-stream"), last_stream_revision ),];Error Handling
Section titled “Error Handling”Constraint Violations
Section titled “Constraint Violations”use evidentsource_core::domain::{DatabaseError, TransactionError};
match conn.transact(events, conditions).await { Ok(db) => { println!("Success at revision {}", db.revision()); } Err(DatabaseError::Transaction(TransactionError::ConstraintViolation(violation))) => { eprintln!("Constraint violated: {:?}", violation); } Err(e) => eprintln!("Transaction failed: {}", e),}Retry Strategies
Section titled “Retry Strategies”use evidentsource_core::domain::{AppendCondition, DatabaseError, EventSelector, TransactionError};use std::time::Duration;
/// Retry with exponential backoff on constraint violationsasync fn transact_with_retry( conn: &Connection, events: NonEmpty<ProspectiveEvent>, max_retries: u32,) -> Result<DatabaseAtRevision, DatabaseError> { let mut retries = 0; let mut backoff = Duration::from_millis(10);
loop { // Get current state let db = conn.latest_database().await?; let conditions = vec![ AppendCondition::Min(EventSelector::any(), db.revision()), ];
match conn.transact(events.clone(), conditions).await { Ok(result) => return Ok(result), Err(DatabaseError::Transaction(TransactionError::ConstraintViolation(_))) if retries < max_retries => { retries += 1; tokio::time::sleep(backoff).await; backoff *= 2; // Exponential backoff continue; } Err(e) => return Err(e), } }}Best Practices
Section titled “Best Practices”1. Choose Appropriate Constraints
Section titled “1. Choose Appropriate Constraints”use evidentsource_core::domain::{AppendCondition, EventSelector};
// For user commands: Check aggregate hasn't changedAppendCondition::unchanged_since( EventSelector::subject_equals(aggregate_id), last_revision)
// For system processes: Check database stateAppendCondition::Min(EventSelector::any(), expected_revision)
// For idempotent operations: Check hasn't been doneAppendCondition::must_not_exist( EventSelector::subject_equals(operation_id))2. Minimize Constraint Scope
Section titled “2. Minimize Constraint Scope”use evidentsource_core::domain::{AppendCondition, EventSelector};
// Good: Specific selectorAppendCondition::Max( EventSelector::subject_equals("order-123"), 100)
// Avoid: Database-wide constraint when not neededAppendCondition::Min(EventSelector::any(), database_revision) // Blocks all concurrent writes3. Handle Violations Gracefully
Section titled “3. Handle Violations Gracefully”use evidentsource_core::domain::{DatabaseError, TransactionError};
enum RetryStrategy { Immediate, ExponentialBackoff, GiveUp,}
fn determine_retry_strategy(error: &DatabaseError) -> RetryStrategy { match error { DatabaseError::Transaction(TransactionError::ConstraintViolation(_)) => { RetryStrategy::ExponentialBackoff } DatabaseError::Transaction(TransactionError::InvalidEvents(_)) => { RetryStrategy::GiveUp } _ => RetryStrategy::Immediate, }}4. Use Constraints for Business Rules
Section titled “4. Use Constraints for Business Rules”use evidentsource_core::domain::{AppendCondition, EventSelector};
// Enforce business invariantslet conditions = vec![ // Account must exist AppendCondition::must_exist( EventSelector::subject_equals(account_id) .and(EventSelector::type_equals("AccountCreated")) ), // No concurrent modifications AppendCondition::unchanged_since( EventSelector::subject_equals(account_id) .and(EventSelector::type_equals("BalanceUpdated")), last_balance_revision ),];Advanced Patterns
Section titled “Advanced Patterns”Multi-Phase Commit
Section titled “Multi-Phase Commit”use evidentsource_core::domain::{AppendCondition, EventSelector};
// Phase 1: Preparelet prepare_conditions = vec![ AppendCondition::must_not_exist( EventSelector::subject_equals(format!("tx-{}-prepared", tx_id)) ),];
conn.transact(NonEmpty::new(prepare_event), prepare_conditions).await?;
// Phase 2: Commit (after all participants prepared)let commit_conditions = vec![ AppendCondition::must_exist( EventSelector::subject_equals(format!("tx-{}-prepared", tx_id)) ), AppendCondition::must_not_exist( EventSelector::subject_equals(format!("tx-{}-committed", tx_id)) ),];
conn.transact(NonEmpty::new(commit_event), commit_conditions).await?;Linearizable Reads
Section titled “Linearizable Reads”// Ensure read sees all writes up to a pointasync fn linearizable_read( conn: &Connection, write_revision: u64, query: &EventSelector,) -> Result<Vec<Event>, DatabaseError> { // Wait for database to catch up let db = conn.database_at_revision(write_revision).await?;
// Query at the awaited revision Ok(db.query_events(query.clone(), QueryOptions::default()) .collect() .await)}Next Steps
Section titled “Next Steps”- Understand Bi-Temporal Indexing for efficient queries
- Learn about Transactions patterns
- Explore State Changes for command validation
- Implement State Views for derived state