State Changes
State Changes
Section titled “State Changes”State Changes are WebAssembly components that handle commands in EvidentSource, implementing the write-side of CQRS (Command Query Responsibility Segregation). They process incoming HTTP-like requests, validate them, and emit events that modify the system state.
Overview
Section titled “Overview”State Changes serve as command handlers that:
- Accept REST-like HTTP requests with path parameters
- Query current state via State Views for decision making
- Emit CloudEvents to record state transitions
- Enforce business rules through transaction constraints
- Execute in a sandboxed WebAssembly environment with resource limits
Key Concepts
Section titled “Key Concepts”Path Templates
Section titled “Path Templates”State Changes use REST-like path templates to define their routing patterns:
/accounts/{account_id}/transfer/users/{user_id}/posts/{post_id}/orders/{order_id}/cancelPath parameters are automatically extracted and made available to your component.
Command Processing Flow
Section titled “Command Processing Flow”- Request Reception: HTTP-like request arrives at the path template endpoint
- Parameter Extraction: Path parameters are extracted from the URL
- State Query: Component can query State Views to read current state
- Business Logic: Component validates the command and decides on events
- Event Emission: Zero or more CloudEvents are emitted
- Constraint Definition: Transaction constraints ensure consistency
Execution Limits
Section titled “Execution Limits”State Changes run with configurable resource limits:
- Memory: Default 100MB limit
- Timeout: Default 30 seconds
- Epoch Interruption: Efficient timeout mechanism using WebAssembly epochs
Writing a State Change Component
Section titled “Writing a State Change Component”WIT Interface Definition
Section titled “WIT Interface Definition”State Changes implement the state-change world defined in WIT:
package evidentsource:decider@0.1.0;
world state-change { import query-state-view; export decide;}Rust Implementation Example
Section titled “Rust Implementation Example”Here’s a complete example of a bank account transfer State Change:
use serde::{Deserialize, Serialize};use chrono::Utc;
wit_bindgen::generate!({ world: "state-change", path: "../../interface",});
use exports::evidentsource::decider::state_change_exports::Guest;use evidentsource::decider::cloudevents::{Cloudevent, CloudeventData};use evidentsource::decider::state_change_types::{ TransactionConstraint, CommandRequest, DecideError, DecideResult, StateChangeContext, EventAttribute, EventSelector, MinConstraint};
struct BankTransferHandler;
#[derive(Serialize, Deserialize)]struct TransferRequest { to_account: String, amount: f64, description: Option<String>,}
#[derive(Serialize, Deserialize)]struct TransferEvent { from_account: String, to_account: String, amount: f64, description: Option<String>, timestamp: String,}
impl Guest for BankTransferHandler { fn decide( context: StateChangeContext, request: CommandRequest, ) -> DecideResult { // Extract account_id from path (assumes /accounts/{account_id}/transfer) let path_parts: Vec<&str> = request.path.split('/').collect(); let from_account = if path_parts.len() >= 3 { path_parts[2].to_string() } else { return Err(DecideError { error_type: "validation".to_string(), message: "Invalid path format".to_string(), }); };
// Parse request body let body = request.body.ok_or_else(|| DecideError { error_type: "validation".to_string(), message: "Request body required".to_string(), })?;
let transfer_request: TransferRequest = serde_json::from_slice(&body).map_err(|e| DecideError { error_type: "validation".to_string(), message: format!("Invalid request body: {}", e), })?;
// Validate amount if transfer_request.amount <= 0.0 { return Err(DecideError { error_type: "validation".to_string(), message: "Transfer amount must be positive".to_string(), }); }
// Query current account balance let balance_params = vec![ ("account_id".to_string(), EventAttribute::Subject(Some(from_account.clone()))), ];
let balance_view = query_state_view( context.database.clone(), "account-balance".to_string(), 1, // version context.database_revision, balance_params, None, // no effective time ).map_err(|e| DecideError { error_type: "internal".to_string(), message: format!("Failed to query balance: {}", e), })?;
// Check sufficient balance if let Some(view) = balance_view { if let Some(content) = view.content { let balance_data: serde_json::Value = serde_json::from_slice(&content).unwrap_or_default(); let current_balance = balance_data["balance"].as_f64().unwrap_or(0.0);
if current_balance < transfer_request.amount { return Err(DecideError { error_type: "validation".to_string(), message: "Insufficient balance".to_string(), }); } } }
// Create transfer event let transfer_event = TransferEvent { from_account: from_account.clone(), to_account: transfer_request.to_account.clone(), amount: transfer_request.amount, description: transfer_request.description, timestamp: Utc::now().to_rfc3339(), };
let event = Cloudevent { id: format!("transfer-{}-{}", from_account, Utc::now().timestamp()), source: format!("accounts/{}/transfers", from_account), type_: "account.transfer.completed".to_string(), subject: Some(format!("account:{}", from_account)), time: None, data: Some(CloudeventData::String( serde_json::to_string(&transfer_event).unwrap() )), attributes: vec![], };
// Add optimistic concurrency control // Ensure no other transfers have occurred since we read the balance let constraint = TransactionConstraint::Min(MinConstraint { selector: EventSelector::Equals( EventAttribute::Subject(Some(format!("account:{}", from_account))) ), revision: context.last_seen_revision, });
Ok((vec![event], vec![constraint])) }}
export!(BankTransferHandler);Building the Component
Section titled “Building the Component”Create a Cargo.toml:
[package]name = "bank-transfer-handler"version = "0.1.0"edition = "2021"
[dependencies]wit-bindgen = "0.38"serde = { version = "1.0", features = ["derive"] }serde_json = "1.0"chrono = { version = "0.4", features = ["serde"] }
[lib]crate-type = ["cdylib"]
[profile.release]opt-level = "z"lto = truecodegen-units = 1Build the component:
cargo build --target wasm32-wasip2 --releaseDeploying State Changes
Section titled “Deploying State Changes”Via REST API
Section titled “Via REST API”Upload the compiled WASM component:
# Create the state changecurl -X POST http://localhost:8080/db/mybank/admin/state-changes \ -H "Content-Type: multipart/form-data" \ -F "name=bank-transfer" \ -F "version=1" \ -F "description=Handles account transfers" \ -F "wasm=@target/wasm32-wasip2/release/bank_transfer_handler.wasm"Via gRPC
Section titled “Via gRPC”Use the gRPC API to register state changes programmatically.
Testing State Changes
Section titled “Testing State Changes”Test Transfer Request
Section titled “Test Transfer Request”Once deployed, test your State Change:
# Execute a transfercurl -X POST http://localhost:8080/db/mybank/accounts/acc-123/transfer \ -H "Content-Type: application/json" \ -H "X-Evident-Revision: 42" \ -d '{ "to_account": "acc-456", "amount": 100.00, "description": "Payment for services" }'Response Format
Section titled “Response Format”Successful execution returns the emitted events:
{ "events": [ { "id": "transfer-acc-123-1703123456", "source": "accounts/acc-123/transfers", "type": "account.transfer.completed", "subject": "account:acc-123", "data": { "from_account": "acc-123", "to_account": "acc-456", "amount": 100.00, "description": "Payment for services", "timestamp": "2024-01-15T10:30:00Z" } } ], "revision": 43}Advanced Features
Section titled “Advanced Features”Querying State Views
Section titled “Querying State Views”State Changes can query State Views to make decisions based on current state:
let params = vec![ ("user_id".to_string(), EventAttribute::Subject(Some(user_id))),];
let user_profile = query_state_view( database, "user-profile".to_string(), 1, // version revision, params, None, // effective time)?;Transaction Constraints
Section titled “Transaction Constraints”Ensure consistency with transaction constraints:
// Minimum revision constraint - ensure state hasn't changedTransactionConstraint::Min(MinConstraint { selector: EventSelector::Equals( EventAttribute::Stream("accounts".to_string()) ), revision: last_seen_revision,})
// Maximum revision constraint - ensure entity existsTransactionConstraint::Max(MaxConstraint { selector: EventSelector::Equals( EventAttribute::Subject(Some(entity_id)) ), revision: u64::MAX,})
// Range constraint - specific revision rangeTransactionConstraint::Range(RangeConstraint { selector: EventSelector::StartsWith( EventAttribute::EventType("order.".to_string()) ), min: 100, max: 200,})Error Handling
Section titled “Error Handling”Return appropriate error types:
validation: Input validation failuresunauthorized: Permission deniedconflict: Optimistic concurrency conflictsinternal: System errors
Err(DecideError { error_type: "validation".to_string(), message: "Amount must be positive".to_string(),})Best Practices
Section titled “Best Practices”- Idempotency: Design commands to be idempotent when possible
- Event Granularity: Emit fine-grained events for better auditability
- State Queries: Only query necessary state to minimize latency
- Error Messages: Provide clear, actionable error messages
- Path Design: Use RESTful conventions for path templates
- Versioning: Version your State Changes for backward compatibility
- Testing: Test with various input combinations and edge cases
Complete Example: Order Processing
Section titled “Complete Example: Order Processing”Here’s a complete example you can use to test State Changes:
Order State Change Component
Section titled “Order State Change Component”use serde::{Deserialize, Serialize};use chrono::Utc;use std::collections::HashMap;
wit_bindgen::generate!({ world: "state-change", path: "../../interface",});
use exports::evidentsource::decider::state_change_exports::Guest;use evidentsource::decider::cloudevents::{Cloudevent, CloudeventData};use evidentsource::decider::state_change_types::{ TransactionConstraint, CommandRequest, DecideError, DecideResult, StateChangeContext, EventAttribute, EventSelector, MaxConstraint};
struct OrderProcessor;
#[derive(Serialize, Deserialize)]struct CreateOrderRequest { customer_id: String, items: Vec<OrderItem>, shipping_address: String,}
#[derive(Serialize, Deserialize)]struct OrderItem { product_id: String, quantity: u32, price: f64,}
#[derive(Serialize, Deserialize)]struct OrderCreatedEvent { order_id: String, customer_id: String, items: Vec<OrderItem>, total_amount: f64, shipping_address: String, status: String, created_at: String,}
impl Guest for OrderProcessor { fn decide( context: StateChangeContext, request: CommandRequest, ) -> DecideResult { // Route based on HTTP method match request.method.as_str() { "POST" => create_order(context, request), "PUT" => update_order(context, request), "DELETE" => cancel_order(context, request), _ => Err(DecideError { error_type: "validation".to_string(), message: format!("Unsupported method: {}", request.method), }), } }}
fn create_order( context: StateChangeContext, request: CommandRequest,) -> DecideResult { // Parse request body let body = request.body.ok_or_else(|| DecideError { error_type: "validation".to_string(), message: "Request body required".to_string(), })?;
let order_request: CreateOrderRequest = serde_json::from_slice(&body).map_err(|e| DecideError { error_type: "validation".to_string(), message: format!("Invalid request: {}", e), })?;
// Validate items if order_request.items.is_empty() { return Err(DecideError { error_type: "validation".to_string(), message: "Order must contain at least one item".to_string(), }); }
// Calculate total let total_amount: f64 = order_request.items .iter() .map(|item| item.price * item.quantity as f64) .sum();
// Generate order ID let order_id = format!("ORD-{}", Utc::now().timestamp());
// Create order created event let event_data = OrderCreatedEvent { order_id: order_id.clone(), customer_id: order_request.customer_id.clone(), items: order_request.items, total_amount, shipping_address: order_request.shipping_address, status: "pending".to_string(), created_at: Utc::now().to_rfc3339(), };
let event = Cloudevent { id: format!("order-created-{}", order_id), source: "orders".to_string(), type_: "order.created".to_string(), subject: Some(format!("order:{}", order_id)), time: None, data: Some(CloudeventData::String( serde_json::to_string(&event_data).unwrap() )), attributes: vec![ ("customer_id".to_string(), order_request.customer_id.clone()), ("total_amount".to_string(), total_amount.to_string()), ], };
// Ensure customer exists (has at least one event) let constraint = TransactionConstraint::Max(MaxConstraint { selector: EventSelector::Equals( EventAttribute::Subject(Some(format!("customer:{}", order_request.customer_id))) ), revision: u64::MAX, });
Ok((vec![event], vec![constraint]))}
fn update_order( _context: StateChangeContext, _request: CommandRequest,) -> DecideResult { // Implementation for order updates Ok((vec![], vec![]))}
fn cancel_order( context: StateChangeContext, request: CommandRequest,) -> DecideResult { // Extract order_id from path let path_parts: Vec<&str> = request.path.split('/').collect(); let order_id = if path_parts.len() >= 3 { path_parts[2].to_string() } else { return Err(DecideError { error_type: "validation".to_string(), message: "Invalid path format".to_string(), }); };
// Create cancellation event let event = Cloudevent { id: format!("order-cancelled-{}-{}", order_id, Utc::now().timestamp()), source: "orders".to_string(), type_: "order.cancelled".to_string(), subject: Some(format!("order:{}", order_id)), time: None, data: Some(CloudeventData::String( serde_json::to_string(&serde_json::json!({ "order_id": order_id, "cancelled_at": Utc::now().to_rfc3339(), "reason": "Customer requested" })).unwrap() )), attributes: vec![], };
Ok((vec![event], vec![]))}
export!(OrderProcessor);Testing the Order Processor
Section titled “Testing the Order Processor”- Deploy the State Change:
curl -X POST http://localhost:8080/db/mystore/admin/state-changes \ -F "name=order-processor" \ -F "version=1" \ -F "path_template=/orders/{order_id}" \ -F "wasm=@order_processor.wasm"- Create an Order:
curl -X POST http://localhost:8080/db/mystore/orders/new \ -H "Content-Type: application/json" \ -d '{ "customer_id": "CUST-123", "items": [ {"product_id": "PROD-A", "quantity": 2, "price": 29.99}, {"product_id": "PROD-B", "quantity": 1, "price": 49.99} ], "shipping_address": "123 Main St, City, Country" }'- Cancel an Order:
curl -X DELETE http://localhost:8080/db/mystore/orders/ORD-1703123456Monitoring and Debugging
Section titled “Monitoring and Debugging”Execution Metrics
Section titled “Execution Metrics”State Changes emit metrics for monitoring:
- Execution time
- Memory usage
- Success/failure rates
- Event emission counts
Error Logs
Section titled “Error Logs”Failed executions are logged with:
- Request details
- Error type and message
- Stack traces (in debug mode)
- Resource usage at failure
Testing Tips
Section titled “Testing Tips”- Test with invalid inputs to verify validation
- Test concurrent requests to verify constraints
- Monitor resource usage under load
- Verify idempotency with duplicate requests
- Test error scenarios and recovery
Limitations
Section titled “Limitations”- Maximum execution time: 30 seconds (configurable)
- Maximum memory: 100MB (configurable)
- No direct network access from components
- No filesystem access from components
- State queries are read-only
Next Steps
Section titled “Next Steps”- Learn about State Views for the read side of CQRS
- Explore Transaction Constraints for consistency
- Review CloudEvents format and conventions
- See API Documentation for deployment details