Transactions
Transactions API
Section titled “Transactions API”Transactions are the fundamental write operation in EvidentSource. They provide atomic, consistent, and durable storage of events with optimistic concurrency control.
Overview
Section titled “Overview”A transaction:
- Atomically commits multiple events (all succeed or all fail)
- Enforces consistency through constraints
- Provides idempotence through transaction IDs
- Assigns monotonically increasing revisions
- Captures transaction and effective timestamps
Transacting Events
Section titled “Transacting Events”gRPC API
Section titled “gRPC API”rpc Transact(TransactionRequest) returns (TransactionReply) {}
message TransactionRequest { string database_name = 1; repeated io.cloudevents.v1.CloudEvent events = 2; repeated TransactionConstraint constraints = 3; string transaction_id = 4; // Client-provided UUID for idempotence}HTTP API
Section titled “HTTP API”POST /api/v1/db/{database}Content-Type: application/json
{ "transaction_id": "550e8400-e29b-41d4-a716-446655440000", "events": [ { "id": "evt-001", "source": "order-service", "specversion": "1.0", "type": "OrderCreated", "subject": "order-123", "datacontenttype": "application/json", "data": { "orderId": "order-123", "customerId": "cust-456", "total": 99.99 } } ], "constraints": [ { "min_revision": { "revision": 42 } } ]}Rust API
Section titled “Rust API”use uuid::Uuid;use evidentsource_domain::core::state_change::transaction::{ ProposedEvent, TransactionConstraint};
let transaction_id = Uuid::new_v4();let events = vec![ ProposedEvent::new( "order-123", "order-service", "OrderCreated", serde_json::json!({ "orderId": "order-123", "customerId": "cust-456", "total": 99.99 }), ),];
let constraints = vec![ TransactionConstraint::MinRevision(42),];
let (transaction, id_mappings) = app.transact( transaction_id, "my-events", events, constraints,).await?;
println!("Transaction committed at revision {}", transaction.revision());CloudEvents Format
Section titled “CloudEvents Format”All events must conform to the CloudEvents specification:
Required Fields
Section titled “Required Fields”| Field | Description | Example |
|---|---|---|
id | Unique event identifier | "evt-001" |
source | Event origin/producer | "order-service" |
specversion | CloudEvents version | "1.0" |
type | Event type | "OrderCreated" |
Recommended Fields
Section titled “Recommended Fields”| Field | Description | Example |
|---|---|---|
subject | Event subject/aggregate | "order-123" |
datacontenttype | Data format | "application/json" |
data | Event payload | {"orderId": "order-123"} |
time | Event occurrence time | "2024-01-15T10:00:00Z" |
Example Event
Section titled “Example Event”{ "id": "evt-001", "source": "order-service", "specversion": "1.0", "type": "OrderCreated", "subject": "order-123", "datacontenttype": "application/json", "time": "2024-01-15T10:00:00Z", "data": { "orderId": "order-123", "customerId": "cust-456", "items": [ {"sku": "WIDGET-1", "quantity": 2, "price": 29.99}, {"sku": "GADGET-2", "quantity": 1, "price": 39.99} ], "total": 99.97, "status": "pending" }}Transaction Constraints
Section titled “Transaction Constraints”Constraints ensure database consistency and ordering guarantees:
MinRevision Constraint
Section titled “MinRevision Constraint”Ensures the database hasn’t changed since your last read:
// Only commit if database is still at revision 42TransactionConstraint::MinRevision(42)Use cases:
- Read-modify-write patterns
- Preventing lost updates
- Ensuring sequential processing
MaxRevision Constraint
Section titled “MaxRevision Constraint”Ensures events are appended in order:
// Only commit if no other events have been added to this streamTransactionConstraint::MaxRevision(10, Some(EventSelector::Stream("order-service")))Use cases:
- Maintaining stream order
- Preventing duplicate processing
- Implementing exactly-once semantics
RevisionRange Constraint
Section titled “RevisionRange Constraint”Ensures a specific revision range exists:
// Ensure revisions 10-20 exist before committingTransactionConstraint::RevisionRange(10, 20, None)Use cases:
- Dependency checking
- Ensuring prerequisite events exist
- Maintaining causal ordering
Idempotence
Section titled “Idempotence”Transaction IDs provide exactly-once semantics:
let transaction_id = Uuid::new_v4();
// First attemptlet result1 = app.transact(transaction_id, "my-events", events.clone(), vec![]).await?;
// Retry with same transaction_id returns same result without duplicating eventslet result2 = app.transact(transaction_id, "my-events", events, vec![]).await?;
assert_eq!(result1.0.revision(), result2.0.revision());Best practices:
- Generate transaction IDs on the client side
- Store transaction IDs for retry logic
- Use deterministic IDs for replayable operations
Response Format
Section titled “Response Format”Success Response
Section titled “Success Response”{ "transaction": { "database": "my-events", "revision": 43, "transaction_timestamp": "2024-01-15T10:00:00Z", "event_count": 1 }, "event_id_mappings": [ { "client_event_id": "evt-001", "stored_event_id": "evt-001" } ]}Event ID Mappings
Section titled “Event ID Mappings”The system may normalize or modify event IDs:
- Ensures uniqueness within streams
- Handles ID conflicts
- Maintains referential integrity
Use the returned mappings to track events after storage.
Error Handling
Section titled “Error Handling”Common Errors
Section titled “Common Errors”| Error Code | Description | Resolution |
|---|---|---|
NOT_FOUND | Database doesn’t exist | Verify database name |
INVALID_ARGUMENT | Malformed event or constraint | Check CloudEvents format |
FAILED_PRECONDITION | Constraint violation | Retry with updated constraints |
RESOURCE_EXHAUSTED | Transaction too large | Split into smaller transactions |
Handling Constraint Violations
Section titled “Handling Constraint Violations”use evidentsource_api::TransactionError;
loop { // Get latest revision let database = app.latest_database("my-events").await?; let current_revision = database.revision();
// Attempt transaction with current revision let constraints = vec![TransactionConstraint::MinRevision(current_revision)];
match app.transact(transaction_id, "my-events", &events, constraints).await { Ok(result) => break Ok(result), Err(TransactionError::ConstraintViolation(_)) => { // Database changed, retry with new revision continue; } Err(e) => break Err(e), }}Performance Considerations
Section titled “Performance Considerations”Transaction Size
Section titled “Transaction Size”- Optimal transaction size: 10-100 events
- Maximum transaction size: 1000 events or 1MB
- Larger transactions increase latency but improve throughput
Throughput Optimization
Section titled “Throughput Optimization”// Group related events together in a single transactionlet events = vec![ order_created_event, inventory_reserved_event, payment_initiated_event,];
// Single transaction instead of threeapp.transact(transaction_id, "my-events", events, constraints).await?;Constraint Strategy
Section titled “Constraint Strategy”- Optimistic: No constraints for append-only workloads
- Pessimistic: MinRevision for consistency-critical operations
- Hybrid: Constraints only for specific streams or subjects
Advanced Patterns
Section titled “Advanced Patterns”Saga Orchestration
Section titled “Saga Orchestration”// Saga step with compensation trackinglet saga_id = Uuid::new_v4();let events = vec![ ProposedEvent::new( saga_id.to_string(), "saga-orchestrator", "SagaStarted", json!({"steps": ["reserve", "charge", "ship"]}), ), ProposedEvent::new( order_id, "order-service", "OrderReserved", json!({"sagaId": saga_id}), ),];
app.transact(transaction_id, "my-events", events, vec![]).await?;Event Sourcing Aggregates
Section titled “Event Sourcing Aggregates”// Load aggregate statelet events = app.query_events( "my-events", latest_revision, &DatabaseQuery::new() .with_selector(EventSelector::Subject("order-123"))).collect().await?;
let aggregate = OrderAggregate::from_events(events);
// Generate new events from commandlet new_events = aggregate.handle_command(command)?;
// Persist with consistency checklet constraints = vec![ TransactionConstraint::MaxRevision( aggregate.version(), Some(EventSelector::Subject("order-123")) ),];
app.transact(transaction_id, "my-events", new_events, constraints).await?;Distributed Transactions
Section titled “Distributed Transactions”// Two-phase commit patternlet prepare_events = vec![ ProposedEvent::new(tx_id, "coordinator", "TransactionPrepared", data),];
// Phase 1: Prepareapp.transact(prepare_id, "my-events", prepare_events, vec![]).await?;
// Phase 2: Commit or Abort based on participant responseslet decision_events = vec![ ProposedEvent::new(tx_id, "coordinator", "TransactionCommitted", data),];
app.transact(commit_id, "my-events", decision_events, vec![]).await?;Transaction Metadata
Section titled “Transaction Metadata”Each transaction captures metadata about its origin and context. This metadata is stored with the transaction and can be used for auditing, debugging, and analytics.
Origin Tracking
Section titled “Origin Tracking”The system automatically captures how each transaction was submitted:
| Origin | Description |
|---|---|
HttpTransaction | Direct transaction via HTTP API |
HttpStateChange | State change execution via HTTP API |
GrpcTransaction | Direct transaction via gRPC API |
GrpcStateChange | State change execution via gRPC API |
KafkaTransaction | Async transaction via Kafka |
KafkaStateChange | Async state change execution via Kafka |
Kafka Origin Metadata
Section titled “Kafka Origin Metadata”For Kafka-originated transactions, additional tracking information is captured:
{ "topic": "transaction-proposals", "partition": 3, "offset": 12847, "correlation_id": "550e8400-e29b-41d4-a716-446655440000"}This enables:
- Message tracing: Track transactions back to specific Kafka messages
- Replay debugging: Identify exact message for replay scenarios
- Correlation: Match responses to original requests
Additional Metadata Fields
Section titled “Additional Metadata Fields”| Field | Description |
|---|---|
last_read_revision | Database revision read before submitting (for causality tracking) |
principal_attributes | Key-value pairs for identity/authorization context |
commit_message | Optional human-readable description of the change |
Best Practices
Section titled “Best Practices”- Group Related Events: Combine causally related events in a single transaction
- Use Meaningful IDs: Include domain context in event IDs for debugging
- Set Time Fields: Provide
timefield for accurate effective-time queries - Validate Early: Validate events before attempting transaction
- Handle Failures: Implement retry logic with exponential backoff
- Monitor Metrics: Track transaction sizes, revision gaps, and constraint violations
Next Steps
Section titled “Next Steps”- Explore Querying Events to read your data
- Learn about Constraints for advanced consistency
- Understand Bi-Temporal Indexing for efficient queries
- Implement State Changes for command processing