Skip to content

State Changes Overview

State Changes are WebAssembly components that implement the Decide pattern for command processing. They validate commands against current state and emit events representing accepted state transitions, providing a bridge between commands and events in event-sourced systems.

State Changes implement command handling logic:

Events = decide(State, Command)

Each State Change:

  • Receives commands via HTTP-like requests
  • Loads current state from State Views
  • Validates business rules and invariants
  • Emits zero or more events on success
  • Returns errors for invalid commands
graph LR
A[Command] --> B[State Change]
B --> C[Load State]
C --> D[Validate Rules]
D --> E{Valid?}
E -->|Yes| F[Emit Events]
E -->|No| G[Return Error]
F --> H[Transaction]

State Changes are implemented as WASM components that:

  • Run in isolation for security and reliability
  • Can be written in any WASM-compatible language
  • Are versioned for safe upgrades and rollbacks
  • Support HTTP-like request/response patterns

State Changes are published via multipart/form-data POST request with two parts:

  • metadata: JSON containing the state change configuration
  • wasm: The compiled WebAssembly component binary
Terminal window
curl -X POST http://localhost:3000/api/v1/db/my-events/state-changes \
-F 'metadata={
"state_change_name": "process-order",
"description": "Handles order lifecycle commands",
"source_code_uri": "https://github.com/example/order-processor"
};type=application/json' \
-F 'wasm=@target/wasm32-wasip1/release/order_processor.wasm;type=application/wasm'

Metadata Fields:

FieldRequiredDescription
state_change_nameYesUnique name for the state change handler
descriptionNoHuman-readable description
source_code_uriNoURI to source code repository

Response: 201 Created with redirect to the admin page for the new state change version.

let wasm_bytes = std::fs::read("order-processor.wasm")?;
let handler = app.publish_state_change_definition(
"my-events", // database
"process-order", // handler name
Some("Handles order lifecycle commands"), // description
Some("https://github.com/example/order-processor"), // source URI
wasm_bytes, // WASM component
).await?;
println!("Published State Change version {}", handler.version());
rpc PublishStateChange(PublishStateChangeRequest)
returns (PublishStateChangeReply) {}
message PublishStateChangeRequest {
string database_name = 1;
string state_change_name = 2;
optional string description = 3;
optional string source_code_uri = 5;
bytes wasm_bytes = 6;
}

State Changes use path templates for routing:

// Static paths
"/orders/create"
// Path parameters
"/orders/{order_id}/cancel"
"/customers/{customer_id}/orders/{order_id}"
// Action patterns
"/orders/{id}/{action}" // action: approve, reject, ship
use evidentsource_domain::state_changes::CommandRequest;
let request = CommandRequest {
method: "POST".into(),
path: "/orders/ORDER-123/cancel".into(),
headers: vec![
("Content-Type".into(), "application/json".into()),
("Authorization".into(), "Bearer token123".into()),
],
query_params: vec![
("reason".into(), "customer-request".into()),
],
body: Some(r#"{
"cancellation_reason": "Customer requested cancellation",
"refund_amount": 99.99
}"#.into()),
};
let batch = app.execute_state_change(
"my-events",
"process-order",
1, // Version 1 of the handler
Some(99), // Last seen at revision 99
request,
).await?;
println!("Command generated events at revision {}", batch.revision());
rpc ExecuteStateChange(ExecuteStateChangeRequest)
returns (ExecuteStateChangeReply) {}
message ExecuteStateChangeRequest {
string database_name = 1;
string state_change_name = 2;
uint64 version = 3;
optional uint64 last_seen_revision = 4;
CommandRequest request = 5;
}
message CommandRequest {
string method = 1;
string path = 2;
repeated QueryParameter query_params = 3;
repeated Header headers = 4;
optional bytes body = 5;
}
POST /api/v1/db/{database}/state-changes/{name}/v{version}/execute
Content-Type: application/json
X-Last-Seen-Revision: 99
{
"method": "POST",
"path": "/orders/ORDER-123/cancel",
"body": {
"cancellation_reason": "Customer requested cancellation",
"refund_amount": 99.99
}
}
use serde::{Deserialize, Serialize};
#[derive(Deserialize)]
struct CancelOrderCommand {
cancellation_reason: String,
refund_amount: f64,
}
#[derive(Serialize)]
struct OrderCancelled {
order_id: String,
reason: String,
refund_amount: f64,
cancelled_at: String,
}
wit_bindgen::generate!({
world: "state-change",
exports: {
world: OrderProcessor,
},
});
struct OrderProcessor;
impl Guest for OrderProcessor {
fn decide(
state_reader: StateReader,
command: CommandRequest,
) -> Result<Vec<Event>, CommandError> {
// Parse path parameters
let path_parts: Vec<&str> = command.path.split('/').collect();
let order_id = path_parts[2];
let action = path_parts[3];
match action {
"cancel" => {
// Load current order state
let order_state = state_reader.get_state_view(
"order-summary",
&[("order_id", order_id)]
)?;
let order: Order = serde_json::from_slice(&order_state)?;
// Validate business rules
if order.status == "shipped" {
return Err(CommandError::ValidationFailed(
"Cannot cancel shipped orders".into()
));
}
if order.status == "cancelled" {
return Err(CommandError::ValidationFailed(
"Order already cancelled".into()
));
}
// Parse command body
let cmd: CancelOrderCommand =
serde_json::from_slice(&command.body.unwrap())?;
// Emit event
let event = Event {
id: Uuid::new_v4().to_string(),
source: "order-processor".into(),
type_: "OrderCancelled".into(),
subject: Some(order_id.into()),
data: serde_json::to_vec(&OrderCancelled {
order_id: order_id.into(),
reason: cmd.cancellation_reason,
refund_amount: cmd.refund_amount,
cancelled_at: Utc::now().to_rfc3339(),
})?,
};
Ok(vec![event])
}
_ => Err(CommandError::UnknownCommand(action.into())),
}
}
}
target/wasm32-wasi/release/order_processor.wasm
# Compile to WASM component
cargo component build --release

State Changes support optimistic concurrency through last_seen_revision:

// Retry pattern for concurrent modifications
loop {
// Get current state
let database = app.latest_database("my-events").await?;
let current_revision = database.revision();
// Execute with revision check
match app.execute_state_change(
"my-events",
"process-order",
1,
Some(current_revision), // Ensure no concurrent changes
request.clone(),
).await {
Ok(batch) => break Ok(batch),
Err(StateChangeExecutionError::ConcurrencyConflict) => {
// Retry with updated state
continue;
}
Err(e) => break Err(e),
}
}

State Changes access current state through the StateReader:

trait StateReader {
// Get State View at current revision
fn get_state_view(
&self,
view_name: &str,
parameters: &[(&str, &str)],
) -> Result<Vec<u8>, Error>;
// Query events directly
fn query_events(
&self,
selector: EventSelector,
limit: usize,
) -> Result<Vec<Event>, Error>;
// Get database metadata
fn get_database_info(&self) -> DatabaseInfo;
}
enum CommandError {
ValidationFailed(String), // Business rule violation
UnknownCommand(String), // Unrecognized action
InvalidInput(String), // Malformed request
StateNotFound(String), // Required state missing
InternalError(String), // Processing failure
}
use evidentsource_api_wasm::StateChangeExecutionError;
match app.execute_state_change(...).await {
Ok(batch) => {
println!("Success: {} events", batch.event_count());
}
Err(StateChangeExecutionError::ValidationError(msg)) => {
eprintln!("Validation failed: {}", msg);
}
Err(StateChangeExecutionError::ConcurrencyConflict) => {
eprintln!("Database changed, retry needed");
}
Err(StateChangeExecutionError::StateChangeNotFound(name)) => {
eprintln!("Handler {} not found", name);
}
Err(e) => eprintln!("Execution failed: {}", e),
}

Run multiple versions simultaneously:

// Version 1 - stable
let v1_result = app.execute_state_change(
"my-events", "process-order", 1, None, request.clone()
).await?;
// Version 2 - testing
let v2_result = app.execute_state_change(
"my-events", "process-order", 2, None, request.clone()
).await?;
// Compare results
assert_eq!(v1_result.event_count(), v2_result.event_count());
use rand::Rng;
// Route percentage of traffic to new version
let version = if rand::thread_rng().gen_range(0..100) < 10 {
2 // 10% to new version
} else {
1 // 90% to stable version
};
app.execute_state_change(
"my-events", "process-order", version, None, request
).await?;
// Saga coordinator State Change
impl Guest for SagaCoordinator {
fn decide(
state_reader: StateReader,
command: CommandRequest,
) -> Result<Vec<Event>, CommandError> {
let saga_id = Uuid::new_v4();
match command.path.as_str() {
"/sagas/order-fulfillment/start" => {
Ok(vec![
Event::saga_started(saga_id, vec![
"reserve-inventory",
"charge-payment",
"schedule-shipping",
]),
Event::saga_step_initiated(saga_id, "reserve-inventory"),
])
}
"/sagas/step/completed" => {
let saga_state = load_saga_state(&state_reader, saga_id)?;
let next_step = saga_state.next_pending_step();
match next_step {
Some(step) => Ok(vec![
Event::saga_step_completed(saga_id, step),
Event::saga_step_initiated(saga_id, next_step),
]),
None => Ok(vec![
Event::saga_completed(saga_id),
]),
}
}
_ => Err(CommandError::UnknownCommand(command.path)),
}
}
}
// Process multiple items in one command
#[derive(Deserialize)]
struct BatchUpdateCommand {
updates: Vec<ItemUpdate>,
}
impl Guest for BatchProcessor {
fn decide(
state_reader: StateReader,
command: CommandRequest,
) -> Result<Vec<Event>, CommandError> {
let batch: BatchUpdateCommand =
serde_json::from_slice(&command.body.unwrap())?;
let mut events = Vec::new();
for update in batch.updates {
// Validate each item
let item_state = state_reader.get_state_view(
"item-summary",
&[("item_id", &update.item_id)]
)?;
if validate_update(&item_state, &update)? {
events.push(Event::item_updated(update));
}
}
Ok(events)
}
}
// Cache frequently accessed state
struct CachingStateReader {
inner: StateReader,
cache: HashMap<String, Vec<u8>>,
}
impl CachingStateReader {
fn get_state_view_cached(
&mut self,
view_name: &str,
parameters: &[(&str, &str)],
) -> Result<Vec<u8>, Error> {
let key = format!("{}:{:?}", view_name, parameters);
if let Some(cached) = self.cache.get(&key) {
return Ok(cached.clone());
}
let state = self.inner.get_state_view(view_name, parameters)?;
self.cache.insert(key, state.clone());
Ok(state)
}
}
// Generate multiple events efficiently
fn generate_order_events(order: &Order) -> Vec<Event> {
let mut events = Vec::with_capacity(3);
events.push(Event::order_confirmed(order));
events.push(Event::inventory_reserved(order));
events.push(Event::payment_initiated(order));
events
}
  1. Idempotent Commands: Design commands to be safely retryable
  2. Validate Early: Check preconditions before expensive operations
  3. Clear Errors: Return specific, actionable error messages
  4. Version Carefully: Test new versions thoroughly before deployment
  5. Monitor Performance: Track execution time and error rates
  6. Use Transactions: Leverage batch transactions for consistency
  7. Cache State: Avoid redundant state loads within a command