Skip to content

State Changes

State Changes are WebAssembly components that handle commands in EvidentSource, implementing the write-side of CQRS (Command Query Responsibility Segregation). They process incoming HTTP-like requests, validate them, and emit events that modify the system state.

State Changes serve as command handlers that:

  • Accept REST-like HTTP requests with path parameters
  • Query current state via State Views for decision making
  • Emit CloudEvents to record state transitions
  • Enforce business rules through transaction constraints
  • Execute in a sandboxed WebAssembly environment with resource limits

State Changes use REST-like path templates to define their routing patterns:

/accounts/{account_id}/transfer
/users/{user_id}/posts/{post_id}
/orders/{order_id}/cancel

Path parameters are automatically extracted and made available to your component.

  1. Request Reception: HTTP-like request arrives at the path template endpoint
  2. Parameter Extraction: Path parameters are extracted from the URL
  3. State Query: Component can query State Views to read current state
  4. Business Logic: Component validates the command and decides on events
  5. Event Emission: Zero or more CloudEvents are emitted
  6. Constraint Definition: Transaction constraints ensure consistency

State Changes run with configurable resource limits:

  • Memory: Default 100MB limit
  • Timeout: Default 30 seconds
  • Epoch Interruption: Efficient timeout mechanism using WebAssembly epochs

State Changes implement the state-change world defined in WIT:

package evidentsource:decider@0.1.0;
world state-change {
import query-state-view;
export decide;
}

Here’s a complete example of a bank account transfer State Change:

src/lib.rs
use serde::{Deserialize, Serialize};
use chrono::Utc;
wit_bindgen::generate!({
world: "state-change",
path: "../../interface",
});
use exports::evidentsource::decider::state_change_exports::Guest;
use evidentsource::decider::cloudevents::{Cloudevent, CloudeventData};
use evidentsource::decider::state_change_types::{
TransactionConstraint, CommandRequest, DecideError, DecideResult,
StateChangeContext, EventAttribute, EventSelector, MinConstraint
};
struct BankTransferHandler;
#[derive(Serialize, Deserialize)]
struct TransferRequest {
to_account: String,
amount: f64,
description: Option<String>,
}
#[derive(Serialize, Deserialize)]
struct TransferEvent {
from_account: String,
to_account: String,
amount: f64,
description: Option<String>,
timestamp: String,
}
impl Guest for BankTransferHandler {
fn decide(
context: StateChangeContext,
request: CommandRequest,
) -> DecideResult {
// Extract account_id from path (assumes /accounts/{account_id}/transfer)
let path_parts: Vec<&str> = request.path.split('/').collect();
let from_account = if path_parts.len() >= 3 {
path_parts[2].to_string()
} else {
return Err(DecideError {
error_type: "validation".to_string(),
message: "Invalid path format".to_string(),
});
};
// Parse request body
let body = request.body.ok_or_else(|| DecideError {
error_type: "validation".to_string(),
message: "Request body required".to_string(),
})?;
let transfer_request: TransferRequest =
serde_json::from_slice(&body).map_err(|e| DecideError {
error_type: "validation".to_string(),
message: format!("Invalid request body: {}", e),
})?;
// Validate amount
if transfer_request.amount <= 0.0 {
return Err(DecideError {
error_type: "validation".to_string(),
message: "Transfer amount must be positive".to_string(),
});
}
// Query current account balance
let balance_params = vec![
("account_id".to_string(), EventAttribute::Subject(Some(from_account.clone()))),
];
let balance_view = query_state_view(
context.database.clone(),
"account-balance".to_string(),
1, // version
context.database_revision,
balance_params,
None, // no effective time
).map_err(|e| DecideError {
error_type: "internal".to_string(),
message: format!("Failed to query balance: {}", e),
})?;
// Check sufficient balance
if let Some(view) = balance_view {
if let Some(content) = view.content {
let balance_data: serde_json::Value =
serde_json::from_slice(&content).unwrap_or_default();
let current_balance = balance_data["balance"].as_f64().unwrap_or(0.0);
if current_balance < transfer_request.amount {
return Err(DecideError {
error_type: "validation".to_string(),
message: "Insufficient balance".to_string(),
});
}
}
}
// Create transfer event
let transfer_event = TransferEvent {
from_account: from_account.clone(),
to_account: transfer_request.to_account.clone(),
amount: transfer_request.amount,
description: transfer_request.description,
timestamp: Utc::now().to_rfc3339(),
};
let event = Cloudevent {
id: format!("transfer-{}-{}", from_account, Utc::now().timestamp()),
source: format!("accounts/{}/transfers", from_account),
type_: "account.transfer.completed".to_string(),
subject: Some(format!("account:{}", from_account)),
time: None,
data: Some(CloudeventData::String(
serde_json::to_string(&transfer_event).unwrap()
)),
attributes: vec![],
};
// Add optimistic concurrency control
// Ensure no other transfers have occurred since we read the balance
let constraint = TransactionConstraint::Min(MinConstraint {
selector: EventSelector::Equals(
EventAttribute::Subject(Some(format!("account:{}", from_account)))
),
revision: context.last_seen_revision,
});
Ok((vec![event], vec![constraint]))
}
}
export!(BankTransferHandler);

Create a Cargo.toml:

[package]
name = "bank-transfer-handler"
version = "0.1.0"
edition = "2021"
[dependencies]
wit-bindgen = "0.38"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
chrono = { version = "0.4", features = ["serde"] }
[lib]
crate-type = ["cdylib"]
[profile.release]
opt-level = "z"
lto = true
codegen-units = 1

Build the component:

Terminal window
cargo build --target wasm32-wasip2 --release

Upload the compiled WASM component:

Terminal window
# Create the state change
curl -X POST http://localhost:8080/db/mybank/admin/state-changes \
-H "Content-Type: multipart/form-data" \
-F "name=bank-transfer" \
-F "version=1" \
-F "description=Handles account transfers" \
-F "wasm=@target/wasm32-wasip2/release/bank_transfer_handler.wasm"

Use the gRPC API to register state changes programmatically.

Once deployed, test your State Change:

Terminal window
# Execute a transfer
curl -X POST http://localhost:8080/db/mybank/accounts/acc-123/transfer \
-H "Content-Type: application/json" \
-H "X-Evident-Revision: 42" \
-d '{
"to_account": "acc-456",
"amount": 100.00,
"description": "Payment for services"
}'

Successful execution returns the emitted events:

{
"events": [
{
"id": "transfer-acc-123-1703123456",
"source": "accounts/acc-123/transfers",
"type": "account.transfer.completed",
"subject": "account:acc-123",
"data": {
"from_account": "acc-123",
"to_account": "acc-456",
"amount": 100.00,
"description": "Payment for services",
"timestamp": "2024-01-15T10:30:00Z"
}
}
],
"revision": 43
}

State Changes can query State Views to make decisions based on current state:

let params = vec![
("user_id".to_string(), EventAttribute::Subject(Some(user_id))),
];
let user_profile = query_state_view(
database,
"user-profile".to_string(),
1, // version
revision,
params,
None, // effective time
)?;

Ensure consistency with transaction constraints:

// Minimum revision constraint - ensure state hasn't changed
TransactionConstraint::Min(MinConstraint {
selector: EventSelector::Equals(
EventAttribute::Stream("accounts".to_string())
),
revision: last_seen_revision,
})
// Maximum revision constraint - ensure entity exists
TransactionConstraint::Max(MaxConstraint {
selector: EventSelector::Equals(
EventAttribute::Subject(Some(entity_id))
),
revision: u64::MAX,
})
// Range constraint - specific revision range
TransactionConstraint::Range(RangeConstraint {
selector: EventSelector::StartsWith(
EventAttribute::EventType("order.".to_string())
),
min: 100,
max: 200,
})

Return appropriate error types:

  • validation: Input validation failures
  • unauthorized: Permission denied
  • conflict: Optimistic concurrency conflicts
  • internal: System errors
Err(DecideError {
error_type: "validation".to_string(),
message: "Amount must be positive".to_string(),
})
  1. Idempotency: Design commands to be idempotent when possible
  2. Event Granularity: Emit fine-grained events for better auditability
  3. State Queries: Only query necessary state to minimize latency
  4. Error Messages: Provide clear, actionable error messages
  5. Path Design: Use RESTful conventions for path templates
  6. Versioning: Version your State Changes for backward compatibility
  7. Testing: Test with various input combinations and edge cases

Here’s a complete example you can use to test State Changes:

order-processor/src/lib.rs
use serde::{Deserialize, Serialize};
use chrono::Utc;
use std::collections::HashMap;
wit_bindgen::generate!({
world: "state-change",
path: "../../interface",
});
use exports::evidentsource::decider::state_change_exports::Guest;
use evidentsource::decider::cloudevents::{Cloudevent, CloudeventData};
use evidentsource::decider::state_change_types::{
TransactionConstraint, CommandRequest, DecideError, DecideResult,
StateChangeContext, EventAttribute, EventSelector, MaxConstraint
};
struct OrderProcessor;
#[derive(Serialize, Deserialize)]
struct CreateOrderRequest {
customer_id: String,
items: Vec<OrderItem>,
shipping_address: String,
}
#[derive(Serialize, Deserialize)]
struct OrderItem {
product_id: String,
quantity: u32,
price: f64,
}
#[derive(Serialize, Deserialize)]
struct OrderCreatedEvent {
order_id: String,
customer_id: String,
items: Vec<OrderItem>,
total_amount: f64,
shipping_address: String,
status: String,
created_at: String,
}
impl Guest for OrderProcessor {
fn decide(
context: StateChangeContext,
request: CommandRequest,
) -> DecideResult {
// Route based on HTTP method
match request.method.as_str() {
"POST" => create_order(context, request),
"PUT" => update_order(context, request),
"DELETE" => cancel_order(context, request),
_ => Err(DecideError {
error_type: "validation".to_string(),
message: format!("Unsupported method: {}", request.method),
}),
}
}
}
fn create_order(
context: StateChangeContext,
request: CommandRequest,
) -> DecideResult {
// Parse request body
let body = request.body.ok_or_else(|| DecideError {
error_type: "validation".to_string(),
message: "Request body required".to_string(),
})?;
let order_request: CreateOrderRequest =
serde_json::from_slice(&body).map_err(|e| DecideError {
error_type: "validation".to_string(),
message: format!("Invalid request: {}", e),
})?;
// Validate items
if order_request.items.is_empty() {
return Err(DecideError {
error_type: "validation".to_string(),
message: "Order must contain at least one item".to_string(),
});
}
// Calculate total
let total_amount: f64 = order_request.items
.iter()
.map(|item| item.price * item.quantity as f64)
.sum();
// Generate order ID
let order_id = format!("ORD-{}", Utc::now().timestamp());
// Create order created event
let event_data = OrderCreatedEvent {
order_id: order_id.clone(),
customer_id: order_request.customer_id.clone(),
items: order_request.items,
total_amount,
shipping_address: order_request.shipping_address,
status: "pending".to_string(),
created_at: Utc::now().to_rfc3339(),
};
let event = Cloudevent {
id: format!("order-created-{}", order_id),
source: "orders".to_string(),
type_: "order.created".to_string(),
subject: Some(format!("order:{}", order_id)),
time: None,
data: Some(CloudeventData::String(
serde_json::to_string(&event_data).unwrap()
)),
attributes: vec![
("customer_id".to_string(), order_request.customer_id.clone()),
("total_amount".to_string(), total_amount.to_string()),
],
};
// Ensure customer exists (has at least one event)
let constraint = TransactionConstraint::Max(MaxConstraint {
selector: EventSelector::Equals(
EventAttribute::Subject(Some(format!("customer:{}", order_request.customer_id)))
),
revision: u64::MAX,
});
Ok((vec![event], vec![constraint]))
}
fn update_order(
_context: StateChangeContext,
_request: CommandRequest,
) -> DecideResult {
// Implementation for order updates
Ok((vec![], vec![]))
}
fn cancel_order(
context: StateChangeContext,
request: CommandRequest,
) -> DecideResult {
// Extract order_id from path
let path_parts: Vec<&str> = request.path.split('/').collect();
let order_id = if path_parts.len() >= 3 {
path_parts[2].to_string()
} else {
return Err(DecideError {
error_type: "validation".to_string(),
message: "Invalid path format".to_string(),
});
};
// Create cancellation event
let event = Cloudevent {
id: format!("order-cancelled-{}-{}", order_id, Utc::now().timestamp()),
source: "orders".to_string(),
type_: "order.cancelled".to_string(),
subject: Some(format!("order:{}", order_id)),
time: None,
data: Some(CloudeventData::String(
serde_json::to_string(&serde_json::json!({
"order_id": order_id,
"cancelled_at": Utc::now().to_rfc3339(),
"reason": "Customer requested"
})).unwrap()
)),
attributes: vec![],
};
Ok((vec![event], vec![]))
}
export!(OrderProcessor);
  1. Deploy the State Change:
Terminal window
curl -X POST http://localhost:8080/db/mystore/admin/state-changes \
-F "name=order-processor" \
-F "version=1" \
-F "path_template=/orders/{order_id}" \
-F "wasm=@order_processor.wasm"
  1. Create an Order:
Terminal window
curl -X POST http://localhost:8080/db/mystore/orders/new \
-H "Content-Type: application/json" \
-d '{
"customer_id": "CUST-123",
"items": [
{"product_id": "PROD-A", "quantity": 2, "price": 29.99},
{"product_id": "PROD-B", "quantity": 1, "price": 49.99}
],
"shipping_address": "123 Main St, City, Country"
}'
  1. Cancel an Order:
Terminal window
curl -X DELETE http://localhost:8080/db/mystore/orders/ORD-1703123456

State Changes emit metrics for monitoring:

  • Execution time
  • Memory usage
  • Success/failure rates
  • Event emission counts

Failed executions are logged with:

  • Request details
  • Error type and message
  • Stack traces (in debug mode)
  • Resource usage at failure
  1. Test with invalid inputs to verify validation
  2. Test concurrent requests to verify constraints
  3. Monitor resource usage under load
  4. Verify idempotency with duplicate requests
  5. Test error scenarios and recovery
  • Maximum execution time: 30 seconds (configurable)
  • Maximum memory: 100MB (configurable)
  • No direct network access from components
  • No filesystem access from components
  • State queries are read-only