Skip to content

Constraints

Constraints in EvidentSource provide optimistic concurrency control and consistency guarantees for batch transactions. They ensure database state meets expectations before committing events.

Constraints are conditions that must be satisfied for a batch to commit:

  • Checked atomically with the batch transaction
  • Provide optimistic concurrency control
  • Enable ordering guarantees
  • Support complex consistency requirements

Ensures the database (or specific selector) hasn’t changed since a known revision.

  • Read-modify-write patterns
  • Preventing lost updates
  • Ensuring sequential processing
// Database-wide constraint
BatchConstraint::MinRevision(42)
// Stream-specific constraint
BatchConstraint::MinRevisionForSelector(
100,
EventSelector::Stream("order-service")
)
message MinRevisionConstraint {
uint64 revision = 1;
optional EventSelector selector = 2;
}
  • If selector is omitted: Database must be at exactly the specified revision
  • If selector is provided: Only events matching the selector must be at the revision
  • Fails with FAILED_PRECONDITION if constraint is violated

Ensures no events exist beyond a specified revision for a selector.

  • Maintaining stream order
  • Preventing duplicate processing
  • Implementing exactly-once semantics
  • Ensuring append-only patterns
// Ensure stream hasn't progressed beyond revision 50
BatchConstraint::MaxRevisionForSelector(
50,
EventSelector::Stream("payment-service")
)
// Ensure subject has no events after revision 100
BatchConstraint::MaxRevisionForSelector(
100,
EventSelector::Subject("order-123")
)
  • Always requires a selector (no database-wide max revision)
  • Checks if any matching events exist after the specified revision
  • Useful for ensuring ordering within streams or subjects

Ensures a specific range of revisions exists in the database.

  • Dependency checking
  • Ensuring prerequisite events exist
  • Maintaining causal ordering
  • Validating event chains
// Ensure revisions 10-20 exist
BatchConstraint::RevisionRange(10, 20, None)
// Ensure specific stream has events in range
BatchConstraint::RevisionRange(
10,
20,
Some(EventSelector::Stream("inventory-service"))
)
  • Verifies all revisions in the range exist
  • Can be scoped to specific selectors
  • Useful for ensuring dependencies are met

Implement optimistic locking for aggregate updates:

// Load aggregate
let events = query_aggregate_events("order-123").await?;
let last_revision = events.last().map(|e| e.revision()).unwrap_or(0);
// Process command
let new_events = process_command(command, events)?;
// Save with concurrency check
let constraints = vec![
BatchConstraint::MaxRevisionForSelector(
last_revision,
EventSelector::Subject("order-123")
)
];
app.transact_batch(batch_id, "my-events", new_events, constraints).await?;

Ensure saga steps execute in order:

// Saga step 2 requires step 1 completion
let constraints = vec![
// Step 1 must have completed
BatchConstraint::MinRevisionForSelector(
step1_revision,
EventSelector::new()
.with_subject("saga-123")
.with_type("Step1Completed")
),
// No other steps have executed
BatchConstraint::MaxRevisionForSelector(
step1_revision,
EventSelector::Subject("saga-123")
),
];

Prevent duplicate event processing:

// Track processed message IDs
let constraints = vec![
// Ensure this message hasn't been processed
BatchConstraint::MaxRevisionForSelector(
0, // No events should exist
EventSelector::Subject(format!("message-{}", message_id))
),
];
let events = vec![
ProposedEvent::new(
format!("message-{}", message_id),
"message-processor",
"MessageProcessed",
json!({"processed_at": Utc::now()})
),
// ... actual business events
];
app.transact_batch(batch_id, "my-events", events, constraints).await?;

Write only if conditions are met:

// Only create if doesn't exist
let constraints = vec![
BatchConstraint::MaxRevisionForSelector(
0,
EventSelector::Subject("user-email@example.com")
),
];
// Only update if exists
let constraints = vec![
BatchConstraint::MinRevisionForSelector(
1, // At least one event must exist
EventSelector::Subject("user-123")
),
];
  1. All constraints are evaluated before any events are written
  2. Constraints see the same database state (snapshot isolation)
  3. If any constraint fails, no events are written
  4. Constraint checks are atomic with the batch transaction
// Efficient: Single constraint check
let constraints = vec![
BatchConstraint::MinRevision(database_revision),
];
// Less efficient: Multiple selector-based constraints
let constraints = vec![
BatchConstraint::MinRevisionForSelector(10, selector1),
BatchConstraint::MinRevisionForSelector(20, selector2),
BatchConstraint::MinRevisionForSelector(30, selector3),
];
// Optimize by combining when possible
let constraints = vec![
BatchConstraint::MinRevision(database_revision),
BatchConstraint::MaxRevisionForSelector(
last_stream_revision,
EventSelector::Stream("critical-stream")
),
];
use evidentsource_api::BatchTransactionError;
match app.transact_batch(batch_id, "my-events", events, constraints).await {
Ok(batch) => {
println!("Success at revision {}", batch.0.revision());
}
Err(BatchTransactionError::ConstraintViolation(violation)) => {
match violation {
ConstraintViolation::MinRevisionNotMet { expected, actual } => {
eprintln!("Database changed: expected {}, got {}", expected, actual);
}
ConstraintViolation::MaxRevisionExceeded { max, found } => {
eprintln!("Events exist beyond {}: found {}", max, found);
}
ConstraintViolation::RangeNotSatisfied { start, end } => {
eprintln!("Missing events in range {}-{}", start, end);
}
}
}
Err(e) => eprintln!("Transaction failed: {}", e),
}
/// Retry with exponential backoff on constraint violations
async fn transact_with_retry(
app: &Application<_, _, _, _, _, _, _>,
batch_id: Uuid,
database: &str,
events: Vec<ProposedEvent>,
max_retries: u32,
) -> Result<BatchDetail, BatchTransactionError> {
let mut retries = 0;
let mut backoff = Duration::from_millis(10);
loop {
// Get current state
let database_state = app.latest_database(database).await?;
let constraints = vec![
BatchConstraint::MinRevision(database_state.revision()),
];
match app.transact_batch(batch_id, database, events.clone(), constraints).await {
Ok(result) => return Ok(result.0),
Err(BatchTransactionError::ConstraintViolation(_)) if retries < max_retries => {
retries += 1;
tokio::time::sleep(backoff).await;
backoff *= 2; // Exponential backoff
continue;
}
Err(e) => return Err(e),
}
}
}
// For user commands: Check aggregate hasn't changed
BatchConstraint::MaxRevisionForSelector(
last_revision,
EventSelector::Subject(aggregate_id)
)
// For system processes: Check database state
BatchConstraint::MinRevision(expected_revision)
// For idempotent operations: Check hasn't been done
BatchConstraint::MaxRevisionForSelector(
0,
EventSelector::Subject(operation_id)
)
// Good: Specific selector
BatchConstraint::MaxRevisionForSelector(
100,
EventSelector::Subject("order-123")
)
// Avoid: Database-wide constraint when not needed
BatchConstraint::MinRevision(database_revision) // Blocks all concurrent writes
enum RetryStrategy {
Immediate,
ExponentialBackoff,
GiveUp,
}
fn determine_retry_strategy(error: &BatchTransactionError) -> RetryStrategy {
match error {
BatchTransactionError::ConstraintViolation(_) => RetryStrategy::ExponentialBackoff,
BatchTransactionError::InvalidBatch(_) => RetryStrategy::GiveUp,
_ => RetryStrategy::Immediate,
}
}
// Enforce business invariants
let constraints = vec![
// Account balance never goes negative
BatchConstraint::MinRevisionForSelector(
1, // Account must exist
EventSelector::new()
.with_subject(account_id)
.with_type("AccountCreated")
),
// No concurrent modifications
BatchConstraint::MaxRevisionForSelector(
last_balance_revision,
EventSelector::new()
.with_subject(account_id)
.with_type("BalanceUpdated")
),
];
// Phase 1: Prepare
let prepare_constraints = vec![
BatchConstraint::MaxRevisionForSelector(
0,
EventSelector::Subject(format!("tx-{}-prepared", tx_id))
),
];
app.transact_batch(
prepare_id,
"my-events",
vec![prepare_event],
prepare_constraints
).await?;
// Phase 2: Commit (after all participants prepared)
let commit_constraints = vec![
BatchConstraint::MinRevisionForSelector(
1,
EventSelector::Subject(format!("tx-{}-prepared", tx_id))
),
BatchConstraint::MaxRevisionForSelector(
0,
EventSelector::Subject(format!("tx-{}-committed", tx_id))
),
];
app.transact_batch(
commit_id,
"my-events",
vec![commit_event],
commit_constraints
).await?;
// Ensure read sees all writes up to a point
async fn linearizable_read(
app: &Application<_, _, _, _, _, _, _>,
database: &str,
write_revision: u64,
) -> Vec<Event> {
// Wait for database to catch up
let database = app.await_database(database, write_revision).await?;
// Query at the awaited revision
app.query_events(database, database.revision(), &query)
.collect()
.await
}