Skip to content

Constraints

Constraints in EvidentSource provide optimistic concurrency control and consistency guarantees for transactions. They ensure database state meets expectations before committing events.

Constraints are conditions that must be satisfied for a transaction to commit:

  • Checked atomically with the transaction
  • Provide optimistic concurrency control
  • Enable ordering guarantees
  • Support complex consistency requirements

Ensures a minimum number of matching events exist at or before a revision.

  • Read-modify-write patterns
  • Preventing lost updates
  • Ensuring sequential processing
use evidentsource_core::domain::{AppendCondition, EventSelector};
// At least 42 events must match the selector
AppendCondition::Min(EventSelector::any(), 42)
// Stream-specific constraint
AppendCondition::Min(
EventSelector::stream_equals("order-service"),
100
)
message MinRevisionConstraint {
uint64 revision = 1;
optional EventSelector selector = 2;
}
  • Checks that at least the specified number of events exist
  • Can be scoped to specific selectors
  • Fails with FAILED_PRECONDITION if constraint is violated

Ensures no more than a specified number of events exist for a selector.

  • Maintaining stream order
  • Preventing duplicate processing
  • Implementing exactly-once semantics
  • Ensuring append-only patterns
use evidentsource_core::domain::{AppendCondition, EventSelector};
// Ensure stream hasn't progressed beyond revision 50
AppendCondition::Max(
EventSelector::stream_equals("payment-service"),
50
)
// Ensure subject has no events (for uniqueness)
AppendCondition::Max(
EventSelector::subject_equals("order-123"),
0
)
// Helper method for uniqueness checks
AppendCondition::fail_if_events_match(
EventSelector::subject_equals("order-123")
)
  • Requires a selector (scoped to specific events)
  • Checks if any matching events exist beyond the specified count
  • Useful for ensuring ordering within streams or subjects

Ensures a specific count of events exists within bounds.

  • Dependency checking
  • Ensuring prerequisite events exist
  • Maintaining causal ordering
  • Validating event chains
use evidentsource_core::domain::{AppendCondition, EventSelector};
// Ensure exactly 10 events exist for selector
AppendCondition::at_revision(
EventSelector::stream_equals("inventory-service"),
10
)?
  • Verifies event count is within the specified range
  • Can be scoped to specific selectors
  • Useful for ensuring dependencies are met

Implement optimistic locking for aggregate updates:

use evidentsource_core::domain::{AppendCondition, EventSelector};
use nonempty::NonEmpty;
// Load aggregate
let db = conn.latest_database().await?;
let events: Vec<_> = db.query_events(
EventSelector::subject_equals("order-123"),
QueryOptions::default()
).collect().await;
let last_revision = events.last().map(|e| e.revision()).unwrap_or(0);
// Process command
let new_events = process_command(command, events)?;
// Save with concurrency check
let conditions = vec![
AppendCondition::unchanged_since(
EventSelector::subject_equals("order-123"),
last_revision
)
];
let result = conn.transact(NonEmpty::from_vec(new_events).unwrap(), conditions).await?;

Ensure saga steps execute in order:

use evidentsource_core::domain::{AppendCondition, EventSelector};
// Saga step 2 requires step 1 completion
let conditions = vec![
// Step 1 must have completed
AppendCondition::must_exist(
EventSelector::subject_equals("saga-123")
.and(EventSelector::type_equals("Step1Completed"))
),
// No other steps have executed beyond step 1
AppendCondition::Max(
EventSelector::subject_equals("saga-123"),
step1_revision
),
];

Prevent duplicate event processing:

use evidentsource_core::domain::{AppendCondition, EventSelector};
// Track processed message IDs - ensure this message hasn't been processed
let conditions = vec![
AppendCondition::must_not_exist(
EventSelector::subject_equals(format!("message-{}", message_id))
),
];
let events = NonEmpty::new(
ProspectiveEvent::new(
format!("message-{}", message_id),
"message-processor",
"MessageProcessed",
json!({"processed_at": Utc::now()})
)
);
let result = conn.transact(events, conditions).await?;

Write only if conditions are met:

use evidentsource_core::domain::{AppendCondition, EventSelector};
// Only create if doesn't exist
let conditions = vec![
AppendCondition::must_not_exist(
EventSelector::subject_equals("user-email@example.com")
),
];
// Only update if exists
let conditions = vec![
AppendCondition::must_exist(
EventSelector::subject_equals("user-123")
),
];
  1. All constraints are evaluated before any events are written
  2. Constraints see the same database state (snapshot isolation)
  3. If any constraint fails, no events are written
  4. Constraint checks are atomic with the transaction
use evidentsource_core::domain::{AppendCondition, EventSelector};
// Efficient: Single constraint check
let conditions = vec![
AppendCondition::Min(EventSelector::any(), database_revision),
];
// Less efficient: Multiple selector-based constraints
let conditions = vec![
AppendCondition::Min(selector1, 10),
AppendCondition::Min(selector2, 20),
AppendCondition::Min(selector3, 30),
];
// Optimize by combining when possible
let conditions = vec![
AppendCondition::Min(EventSelector::any(), database_revision),
AppendCondition::unchanged_since(
EventSelector::stream_equals("critical-stream"),
last_stream_revision
),
];
use evidentsource_core::domain::{DatabaseError, TransactionError};
match conn.transact(events, conditions).await {
Ok(db) => {
println!("Success at revision {}", db.revision());
}
Err(DatabaseError::Transaction(TransactionError::ConstraintViolation(violation))) => {
eprintln!("Constraint violated: {:?}", violation);
}
Err(e) => eprintln!("Transaction failed: {}", e),
}
use evidentsource_core::domain::{AppendCondition, DatabaseError, EventSelector, TransactionError};
use std::time::Duration;
/// Retry with exponential backoff on constraint violations
async fn transact_with_retry(
conn: &Connection,
events: NonEmpty<ProspectiveEvent>,
max_retries: u32,
) -> Result<DatabaseAtRevision, DatabaseError> {
let mut retries = 0;
let mut backoff = Duration::from_millis(10);
loop {
// Get current state
let db = conn.latest_database().await?;
let conditions = vec![
AppendCondition::Min(EventSelector::any(), db.revision()),
];
match conn.transact(events.clone(), conditions).await {
Ok(result) => return Ok(result),
Err(DatabaseError::Transaction(TransactionError::ConstraintViolation(_)))
if retries < max_retries =>
{
retries += 1;
tokio::time::sleep(backoff).await;
backoff *= 2; // Exponential backoff
continue;
}
Err(e) => return Err(e),
}
}
}
use evidentsource_core::domain::{AppendCondition, EventSelector};
// For user commands: Check aggregate hasn't changed
AppendCondition::unchanged_since(
EventSelector::subject_equals(aggregate_id),
last_revision
)
// For system processes: Check database state
AppendCondition::Min(EventSelector::any(), expected_revision)
// For idempotent operations: Check hasn't been done
AppendCondition::must_not_exist(
EventSelector::subject_equals(operation_id)
)
use evidentsource_core::domain::{AppendCondition, EventSelector};
// Good: Specific selector
AppendCondition::Max(
EventSelector::subject_equals("order-123"),
100
)
// Avoid: Database-wide constraint when not needed
AppendCondition::Min(EventSelector::any(), database_revision) // Blocks all concurrent writes
use evidentsource_core::domain::{DatabaseError, TransactionError};
enum RetryStrategy {
Immediate,
ExponentialBackoff,
GiveUp,
}
fn determine_retry_strategy(error: &DatabaseError) -> RetryStrategy {
match error {
DatabaseError::Transaction(TransactionError::ConstraintViolation(_)) => {
RetryStrategy::ExponentialBackoff
}
DatabaseError::Transaction(TransactionError::InvalidEvents(_)) => {
RetryStrategy::GiveUp
}
_ => RetryStrategy::Immediate,
}
}
use evidentsource_core::domain::{AppendCondition, EventSelector};
// Enforce business invariants
let conditions = vec![
// Account must exist
AppendCondition::must_exist(
EventSelector::subject_equals(account_id)
.and(EventSelector::type_equals("AccountCreated"))
),
// No concurrent modifications
AppendCondition::unchanged_since(
EventSelector::subject_equals(account_id)
.and(EventSelector::type_equals("BalanceUpdated")),
last_balance_revision
),
];
use evidentsource_core::domain::{AppendCondition, EventSelector};
// Phase 1: Prepare
let prepare_conditions = vec![
AppendCondition::must_not_exist(
EventSelector::subject_equals(format!("tx-{}-prepared", tx_id))
),
];
conn.transact(NonEmpty::new(prepare_event), prepare_conditions).await?;
// Phase 2: Commit (after all participants prepared)
let commit_conditions = vec![
AppendCondition::must_exist(
EventSelector::subject_equals(format!("tx-{}-prepared", tx_id))
),
AppendCondition::must_not_exist(
EventSelector::subject_equals(format!("tx-{}-committed", tx_id))
),
];
conn.transact(NonEmpty::new(commit_event), commit_conditions).await?;
// Ensure read sees all writes up to a point
async fn linearizable_read(
conn: &Connection,
write_revision: u64,
query: &EventSelector,
) -> Result<Vec<Event>, DatabaseError> {
// Wait for database to catch up
let db = conn.database_at_revision(write_revision).await?;
// Query at the awaited revision
Ok(db.query_events(query.clone(), QueryOptions::default())
.collect()
.await)
}