State Changes Overview
State Changes API Overview
Section titled “State Changes API Overview”State Changes are WebAssembly components that implement the Decide pattern for command processing. They validate commands against current state and emit events representing accepted state transitions, providing a bridge between commands and events in event-sourced systems.
Concepts
Section titled “Concepts”The Decide Pattern
Section titled “The Decide Pattern”State Changes implement command handling logic:
Events = decide(State, Command)Each State Change:
- Receives commands via HTTP-like requests
- Loads current state from State Views
- Validates business rules and invariants
- Emits zero or more events on success
- Returns errors for invalid commands
Command Processing Flow
Section titled “Command Processing Flow”graph LR A[Command] --> B[State Change] B --> C[Load State] C --> D[Validate Rules] D --> E{Valid?} E -->|Yes| F[Emit Events] E -->|No| G[Return Error] F --> H[Transaction]WebAssembly Components
Section titled “WebAssembly Components”State Changes are implemented as WASM components that:
- Run in isolation for security and reliability
- Can be written in any WASM-compatible language
- Are versioned for safe upgrades and rollbacks
- Support HTTP-like request/response patterns
Publishing State Changes
Section titled “Publishing State Changes”HTTP API
Section titled “HTTP API”State Changes are published via multipart/form-data POST request with two parts:
metadata: JSON containing the state change configurationwasm: The compiled WebAssembly component binary
curl -X POST http://localhost:3000/api/v1/db/my-events/state-changes \ -F 'metadata={ "state_change_name": "process-order", "description": "Handles order lifecycle commands", "source_code_uri": "https://github.com/example/order-processor" };type=application/json' \ -F 'wasm=@target/wasm32-wasip1/release/order_processor.wasm;type=application/wasm'Metadata Fields:
| Field | Required | Description |
|---|---|---|
state_change_name | Yes | Unique name for the state change handler |
description | No | Human-readable description |
source_code_uri | No | URI to source code repository |
Response: 201 Created with redirect to the admin page for the new state change version.
Rust API
Section titled “Rust API”let wasm_bytes = std::fs::read("order-processor.wasm")?;
let handler = app.publish_state_change_definition( "my-events", // database "process-order", // handler name Some("Handles order lifecycle commands"), // description Some("https://github.com/example/order-processor"), // source URI wasm_bytes, // WASM component).await?;
println!("Published State Change version {}", handler.version());gRPC API
Section titled “gRPC API”rpc PublishStateChange(PublishStateChangeRequest) returns (PublishStateChangeReply) {}
message PublishStateChangeRequest { string database_name = 1; string state_change_name = 2; optional string description = 3; optional string source_code_uri = 5; bytes wasm_bytes = 6;}Path Templates
Section titled “Path Templates”State Changes use path templates for routing:
// Static paths"/orders/create"
// Path parameters"/orders/{order_id}/cancel""/customers/{customer_id}/orders/{order_id}"
// Action patterns"/orders/{id}/{action}" // action: approve, reject, shipExecuting State Changes
Section titled “Executing State Changes”Rust API
Section titled “Rust API”use evidentsource_domain::state_changes::CommandRequest;
let request = CommandRequest { method: "POST".into(), path: "/orders/ORDER-123/cancel".into(), headers: vec![ ("Content-Type".into(), "application/json".into()), ("Authorization".into(), "Bearer token123".into()), ], query_params: vec![ ("reason".into(), "customer-request".into()), ], body: Some(r#"{ "cancellation_reason": "Customer requested cancellation", "refund_amount": 99.99 }"#.into()),};
let batch = app.execute_state_change( "my-events", "process-order", 1, // Version 1 of the handler Some(99), // Last seen at revision 99 request,).await?;
println!("Command generated events at revision {}", batch.revision());gRPC API
Section titled “gRPC API”rpc ExecuteStateChange(ExecuteStateChangeRequest) returns (ExecuteStateChangeReply) {}
message ExecuteStateChangeRequest { string database_name = 1; string state_change_name = 2; uint64 version = 3; optional uint64 last_seen_revision = 4; CommandRequest request = 5;}
message CommandRequest { string method = 1; string path = 2; repeated QueryParameter query_params = 3; repeated Header headers = 4; optional bytes body = 5;}HTTP API
Section titled “HTTP API”POST /api/v1/db/{database}/state-changes/{name}/v{version}/executeContent-Type: application/jsonX-Last-Seen-Revision: 99
{ "method": "POST", "path": "/orders/ORDER-123/cancel", "body": { "cancellation_reason": "Customer requested cancellation", "refund_amount": 99.99 }}Implementation Example
Section titled “Implementation Example”WASM Component (Rust)
Section titled “WASM Component (Rust)”use serde::{Deserialize, Serialize};
#[derive(Deserialize)]struct CancelOrderCommand { cancellation_reason: String, refund_amount: f64,}
#[derive(Serialize)]struct OrderCancelled { order_id: String, reason: String, refund_amount: f64, cancelled_at: String,}
wit_bindgen::generate!({ world: "state-change", exports: { world: OrderProcessor, },});
struct OrderProcessor;
impl Guest for OrderProcessor { fn decide( state_reader: StateReader, command: CommandRequest, ) -> Result<Vec<Event>, CommandError> { // Parse path parameters let path_parts: Vec<&str> = command.path.split('/').collect(); let order_id = path_parts[2]; let action = path_parts[3];
match action { "cancel" => { // Load current order state let order_state = state_reader.get_state_view( "order-summary", &[("order_id", order_id)] )?;
let order: Order = serde_json::from_slice(&order_state)?;
// Validate business rules if order.status == "shipped" { return Err(CommandError::ValidationFailed( "Cannot cancel shipped orders".into() )); }
if order.status == "cancelled" { return Err(CommandError::ValidationFailed( "Order already cancelled".into() )); }
// Parse command body let cmd: CancelOrderCommand = serde_json::from_slice(&command.body.unwrap())?;
// Emit event let event = Event { id: Uuid::new_v4().to_string(), source: "order-processor".into(), type_: "OrderCancelled".into(), subject: Some(order_id.into()), data: serde_json::to_vec(&OrderCancelled { order_id: order_id.into(), reason: cmd.cancellation_reason, refund_amount: cmd.refund_amount, cancelled_at: Utc::now().to_rfc3339(), })?, };
Ok(vec![event]) } _ => Err(CommandError::UnknownCommand(action.into())), } }}Compilation
Section titled “Compilation”# Compile to WASM componentcargo component build --release
Optimistic Concurrency Control
Section titled “Optimistic Concurrency Control”State Changes support optimistic concurrency through last_seen_revision:
// Retry pattern for concurrent modificationsloop { // Get current state let database = app.latest_database("my-events").await?; let current_revision = database.revision();
// Execute with revision check match app.execute_state_change( "my-events", "process-order", 1, Some(current_revision), // Ensure no concurrent changes request.clone(), ).await { Ok(batch) => break Ok(batch), Err(StateChangeExecutionError::ConcurrencyConflict) => { // Retry with updated state continue; } Err(e) => break Err(e), }}State Reader Interface
Section titled “State Reader Interface”State Changes access current state through the StateReader:
trait StateReader { // Get State View at current revision fn get_state_view( &self, view_name: &str, parameters: &[(&str, &str)], ) -> Result<Vec<u8>, Error>;
// Query events directly fn query_events( &self, selector: EventSelector, limit: usize, ) -> Result<Vec<Event>, Error>;
// Get database metadata fn get_database_info(&self) -> DatabaseInfo;}Error Handling
Section titled “Error Handling”Command Validation Errors
Section titled “Command Validation Errors”enum CommandError { ValidationFailed(String), // Business rule violation UnknownCommand(String), // Unrecognized action InvalidInput(String), // Malformed request StateNotFound(String), // Required state missing InternalError(String), // Processing failure}Execution Errors
Section titled “Execution Errors”use evidentsource_api_wasm::StateChangeExecutionError;
match app.execute_state_change(...).await { Ok(batch) => { println!("Success: {} events", batch.event_count()); } Err(StateChangeExecutionError::ValidationError(msg)) => { eprintln!("Validation failed: {}", msg); } Err(StateChangeExecutionError::ConcurrencyConflict) => { eprintln!("Database changed, retry needed"); } Err(StateChangeExecutionError::StateChangeNotFound(name)) => { eprintln!("Handler {} not found", name); } Err(e) => eprintln!("Execution failed: {}", e),}Versioning Strategy
Section titled “Versioning Strategy”Parallel Versions
Section titled “Parallel Versions”Run multiple versions simultaneously:
// Version 1 - stablelet v1_result = app.execute_state_change( "my-events", "process-order", 1, None, request.clone()).await?;
// Version 2 - testinglet v2_result = app.execute_state_change( "my-events", "process-order", 2, None, request.clone()).await?;
// Compare resultsassert_eq!(v1_result.event_count(), v2_result.event_count());Gradual Rollout
Section titled “Gradual Rollout”use rand::Rng;
// Route percentage of traffic to new versionlet version = if rand::thread_rng().gen_range(0..100) < 10 { 2 // 10% to new version} else { 1 // 90% to stable version};
app.execute_state_change( "my-events", "process-order", version, None, request).await?;Complex Command Patterns
Section titled “Complex Command Patterns”Saga Orchestration
Section titled “Saga Orchestration”// Saga coordinator State Changeimpl Guest for SagaCoordinator { fn decide( state_reader: StateReader, command: CommandRequest, ) -> Result<Vec<Event>, CommandError> { let saga_id = Uuid::new_v4();
match command.path.as_str() { "/sagas/order-fulfillment/start" => { Ok(vec![ Event::saga_started(saga_id, vec![ "reserve-inventory", "charge-payment", "schedule-shipping", ]), Event::saga_step_initiated(saga_id, "reserve-inventory"), ]) } "/sagas/step/completed" => { let saga_state = load_saga_state(&state_reader, saga_id)?; let next_step = saga_state.next_pending_step();
match next_step { Some(step) => Ok(vec![ Event::saga_step_completed(saga_id, step), Event::saga_step_initiated(saga_id, next_step), ]), None => Ok(vec![ Event::saga_completed(saga_id), ]), } } _ => Err(CommandError::UnknownCommand(command.path)), } }}Batch Commands
Section titled “Batch Commands”// Process multiple items in one command#[derive(Deserialize)]struct BatchUpdateCommand { updates: Vec<ItemUpdate>,}
impl Guest for BatchProcessor { fn decide( state_reader: StateReader, command: CommandRequest, ) -> Result<Vec<Event>, CommandError> { let batch: BatchUpdateCommand = serde_json::from_slice(&command.body.unwrap())?;
let mut events = Vec::new();
for update in batch.updates { // Validate each item let item_state = state_reader.get_state_view( "item-summary", &[("item_id", &update.item_id)] )?;
if validate_update(&item_state, &update)? { events.push(Event::item_updated(update)); } }
Ok(events) }}Performance Optimization
Section titled “Performance Optimization”State Caching
Section titled “State Caching”// Cache frequently accessed statestruct CachingStateReader { inner: StateReader, cache: HashMap<String, Vec<u8>>,}
impl CachingStateReader { fn get_state_view_cached( &mut self, view_name: &str, parameters: &[(&str, &str)], ) -> Result<Vec<u8>, Error> { let key = format!("{}:{:?}", view_name, parameters);
if let Some(cached) = self.cache.get(&key) { return Ok(cached.clone()); }
let state = self.inner.get_state_view(view_name, parameters)?; self.cache.insert(key, state.clone()); Ok(state) }}Batch Event Generation
Section titled “Batch Event Generation”// Generate multiple events efficientlyfn generate_order_events(order: &Order) -> Vec<Event> { let mut events = Vec::with_capacity(3);
events.push(Event::order_confirmed(order)); events.push(Event::inventory_reserved(order)); events.push(Event::payment_initiated(order));
events}Best Practices
Section titled “Best Practices”- Idempotent Commands: Design commands to be safely retryable
- Validate Early: Check preconditions before expensive operations
- Clear Errors: Return specific, actionable error messages
- Version Carefully: Test new versions thoroughly before deployment
- Monitor Performance: Track execution time and error rates
- Use Transactions: Leverage batch transactions for consistency
- Cache State: Avoid redundant state loads within a command
Next Steps
Section titled “Next Steps”- Learn Defining State Changes
- Explore Executing Commands
- Understand Error Handling
- Implement State Views for state storage