Skip to content

Transactions

Transactions are the fundamental write operation in EvidentSource. They provide atomic, consistent, and durable storage of events with optimistic concurrency control.

A transaction:

  • Atomically commits multiple events (all succeed or all fail)
  • Enforces consistency through constraints
  • Provides idempotence through transaction IDs
  • Assigns monotonically increasing revisions
  • Captures transaction and effective timestamps
rpc Transact(TransactionRequest) returns (TransactionReply) {}
message TransactionRequest {
string database_name = 1;
repeated io.cloudevents.v1.CloudEvent events = 2;
repeated TransactionConstraint constraints = 3;
string transaction_id = 4; // Client-provided UUID for idempotence
}
POST /api/v1/db/{database}
Content-Type: application/json
{
"transaction_id": "550e8400-e29b-41d4-a716-446655440000",
"events": [
{
"id": "evt-001",
"source": "order-service",
"specversion": "1.0",
"type": "OrderCreated",
"subject": "order-123",
"datacontenttype": "application/json",
"data": {
"orderId": "order-123",
"customerId": "cust-456",
"total": 99.99
}
}
],
"constraints": [
{
"min_revision": {
"revision": 42
}
}
]
}
use uuid::Uuid;
use evidentsource_domain::core::state_change::transaction::{
ProposedEvent, TransactionConstraint
};
let transaction_id = Uuid::new_v4();
let events = vec![
ProposedEvent::new(
"order-123",
"order-service",
"OrderCreated",
serde_json::json!({
"orderId": "order-123",
"customerId": "cust-456",
"total": 99.99
}),
),
];
let constraints = vec![
TransactionConstraint::MinRevision(42),
];
let (transaction, id_mappings) = app.transact(
transaction_id,
"my-events",
events,
constraints,
).await?;
println!("Transaction committed at revision {}", transaction.revision());

All events must conform to the CloudEvents specification:

FieldDescriptionExample
idUnique event identifier"evt-001"
sourceEvent origin/producer"order-service"
specversionCloudEvents version"1.0"
typeEvent type"OrderCreated"
FieldDescriptionExample
subjectEvent subject/aggregate"order-123"
datacontenttypeData format"application/json"
dataEvent payload{"orderId": "order-123"}
timeEvent occurrence time"2024-01-15T10:00:00Z"
{
"id": "evt-001",
"source": "order-service",
"specversion": "1.0",
"type": "OrderCreated",
"subject": "order-123",
"datacontenttype": "application/json",
"time": "2024-01-15T10:00:00Z",
"data": {
"orderId": "order-123",
"customerId": "cust-456",
"items": [
{"sku": "WIDGET-1", "quantity": 2, "price": 29.99},
{"sku": "GADGET-2", "quantity": 1, "price": 39.99}
],
"total": 99.97,
"status": "pending"
}
}

Constraints ensure database consistency and ordering guarantees:

Ensures the database hasn’t changed since your last read:

// Only commit if database is still at revision 42
TransactionConstraint::MinRevision(42)

Use cases:

  • Read-modify-write patterns
  • Preventing lost updates
  • Ensuring sequential processing

Ensures events are appended in order:

// Only commit if no other events have been added to this stream
TransactionConstraint::MaxRevision(10, Some(EventSelector::Stream("order-service")))

Use cases:

  • Maintaining stream order
  • Preventing duplicate processing
  • Implementing exactly-once semantics

Ensures a specific revision range exists:

// Ensure revisions 10-20 exist before committing
TransactionConstraint::RevisionRange(10, 20, None)

Use cases:

  • Dependency checking
  • Ensuring prerequisite events exist
  • Maintaining causal ordering

Transaction IDs provide exactly-once semantics:

let transaction_id = Uuid::new_v4();
// First attempt
let result1 = app.transact(transaction_id, "my-events", events.clone(), vec![]).await?;
// Retry with same transaction_id returns same result without duplicating events
let result2 = app.transact(transaction_id, "my-events", events, vec![]).await?;
assert_eq!(result1.0.revision(), result2.0.revision());

Best practices:

  • Generate transaction IDs on the client side
  • Store transaction IDs for retry logic
  • Use deterministic IDs for replayable operations
{
"transaction": {
"database": "my-events",
"revision": 43,
"transaction_timestamp": "2024-01-15T10:00:00Z",
"event_count": 1
},
"event_id_mappings": [
{
"client_event_id": "evt-001",
"stored_event_id": "evt-001"
}
]
}

The system may normalize or modify event IDs:

  • Ensures uniqueness within streams
  • Handles ID conflicts
  • Maintains referential integrity

Use the returned mappings to track events after storage.

Error CodeDescriptionResolution
NOT_FOUNDDatabase doesn’t existVerify database name
INVALID_ARGUMENTMalformed event or constraintCheck CloudEvents format
FAILED_PRECONDITIONConstraint violationRetry with updated constraints
RESOURCE_EXHAUSTEDTransaction too largeSplit into smaller transactions
use evidentsource_api::TransactionError;
loop {
// Get latest revision
let database = app.latest_database("my-events").await?;
let current_revision = database.revision();
// Attempt transaction with current revision
let constraints = vec![TransactionConstraint::MinRevision(current_revision)];
match app.transact(transaction_id, "my-events", &events, constraints).await {
Ok(result) => break Ok(result),
Err(TransactionError::ConstraintViolation(_)) => {
// Database changed, retry with new revision
continue;
}
Err(e) => break Err(e),
}
}
  • Optimal transaction size: 10-100 events
  • Maximum transaction size: 1000 events or 1MB
  • Larger transactions increase latency but improve throughput
// Group related events together in a single transaction
let events = vec![
order_created_event,
inventory_reserved_event,
payment_initiated_event,
];
// Single transaction instead of three
app.transact(transaction_id, "my-events", events, constraints).await?;
  1. Optimistic: No constraints for append-only workloads
  2. Pessimistic: MinRevision for consistency-critical operations
  3. Hybrid: Constraints only for specific streams or subjects
// Saga step with compensation tracking
let saga_id = Uuid::new_v4();
let events = vec![
ProposedEvent::new(
saga_id.to_string(),
"saga-orchestrator",
"SagaStarted",
json!({"steps": ["reserve", "charge", "ship"]}),
),
ProposedEvent::new(
order_id,
"order-service",
"OrderReserved",
json!({"sagaId": saga_id}),
),
];
app.transact(transaction_id, "my-events", events, vec![]).await?;
// Load aggregate state
let events = app.query_events(
"my-events",
latest_revision,
&DatabaseQuery::new()
.with_selector(EventSelector::Subject("order-123"))
).collect().await?;
let aggregate = OrderAggregate::from_events(events);
// Generate new events from command
let new_events = aggregate.handle_command(command)?;
// Persist with consistency check
let constraints = vec![
TransactionConstraint::MaxRevision(
aggregate.version(),
Some(EventSelector::Subject("order-123"))
),
];
app.transact(transaction_id, "my-events", new_events, constraints).await?;
// Two-phase commit pattern
let prepare_events = vec![
ProposedEvent::new(tx_id, "coordinator", "TransactionPrepared", data),
];
// Phase 1: Prepare
app.transact(prepare_id, "my-events", prepare_events, vec![]).await?;
// Phase 2: Commit or Abort based on participant responses
let decision_events = vec![
ProposedEvent::new(tx_id, "coordinator", "TransactionCommitted", data),
];
app.transact(commit_id, "my-events", decision_events, vec![]).await?;

Each transaction captures metadata about its origin and context. This metadata is stored with the transaction and can be used for auditing, debugging, and analytics.

The system automatically captures how each transaction was submitted:

OriginDescription
HttpTransactionDirect transaction via HTTP API
HttpStateChangeState change execution via HTTP API
GrpcTransactionDirect transaction via gRPC API
GrpcStateChangeState change execution via gRPC API
KafkaTransactionAsync transaction via Kafka
KafkaStateChangeAsync state change execution via Kafka

For Kafka-originated transactions, additional tracking information is captured:

{
"topic": "transaction-proposals",
"partition": 3,
"offset": 12847,
"correlation_id": "550e8400-e29b-41d4-a716-446655440000"
}

This enables:

  • Message tracing: Track transactions back to specific Kafka messages
  • Replay debugging: Identify exact message for replay scenarios
  • Correlation: Match responses to original requests
FieldDescription
last_read_revisionDatabase revision read before submitting (for causality tracking)
principal_attributesKey-value pairs for identity/authorization context
commit_messageOptional human-readable description of the change
  1. Group Related Events: Combine causally related events in a single transaction
  2. Use Meaningful IDs: Include domain context in event IDs for debugging
  3. Set Time Fields: Provide time field for accurate effective-time queries
  4. Validate Early: Validate events before attempting transaction
  5. Handle Failures: Implement retry logic with exponential backoff
  6. Monitor Metrics: Track transaction sizes, revision gaps, and constraint violations