gRPC API
The gRPC API provides high-performance, type-safe access to all EvidentSource functionality. It’s the recommended API for production applications and client libraries.
Service Definition
Section titled “Service Definition”The complete service is defined in service.proto:
service EvidentSource { // Database operations rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse); rpc DeleteDatabase(DeleteDatabaseRequest) returns (DeleteDatabaseResponse); rpc FetchCatalog(FetchCatalogRequest) returns (FetchCatalogResponse);
// Event operations rpc Transact(TransactionRequest) returns (TransactionReply); rpc QueryEvents(QueryEventsRequest) returns (stream Event); rpc ScanDatabaseLog(ScanDatabaseLogRequest) returns (stream Event);
// Database state rpc FetchLatestDatabase(FetchLatestDatabaseRequest) returns (Database); rpc FetchDatabaseAtRevision(FetchDatabaseAtRevisionRequest) returns (Database); rpc DatabaseEffectiveAtTimestamp(DatabaseEffectiveAtTimestampRequest) returns (Database);
// State views rpc DefineStateView(DefineStateViewRequest) returns (DefineStateViewResponse); rpc FetchStateView(FetchStateViewRequest) returns (State); rpc FetchStateViewAtRevision(FetchStateViewAtRevisionRequest) returns (State);
// State changes rpc DefineStateChange(DefineStateChangeRequest) returns (DefineStateChangeResponse); rpc ProcessStateChange(ProcessStateChangeRequest) returns (ProcessStateChangeResponse);}Connection Setup
Section titled “Connection Setup”Basic Connection
Section titled “Basic Connection”use tonic::transport::Channel;use evidentsource::proto::evident_source_client::EvidentSourceClient;
// Development (insecure)let channel = Channel::from_static("http://localhost:8080") .connect() .await?;
let client = EvidentSourceClient::new(channel);Secure Connection
Section titled “Secure Connection”use tonic::transport::{Channel, ClientTlsConfig};
// With TLSlet tls = ClientTlsConfig::new();let channel = Channel::from_static("https://api.example.com") .tls_config(tls)? .connect() .await?;
let client = EvidentSourceClient::new(channel);Database Operations
Section titled “Database Operations”Create Database
Section titled “Create Database”Creates a new event database.
message CreateDatabaseRequest { string database_name = 1;}
message CreateDatabaseResponse { Database database = 1;}Example:
let request = CreateDatabaseRequest { database_name: "my-events".into(),};
let response = client.create_database(request).await?;println!("Created database: {}", response.into_inner().database.unwrap().name);Errors:
ALREADY_EXISTS: Database name already takenINVALID_ARGUMENT: Invalid database name format
Delete Database
Section titled “Delete Database”Permanently deletes a database and all its events.
message DeleteDatabaseRequest { string database_name = 1;}
message DeleteDatabaseResponse { // Empty}Example:
let request = DeleteDatabaseRequest { database_name: "old-events".into(),};
client.delete_database(request).await?;Fetch Catalog
Section titled “Fetch Catalog”Lists all databases.
message FetchCatalogRequest { // Empty}
message FetchCatalogResponse { repeated Database databases = 1;}Example:
let request = FetchCatalogRequest {};
let response = client.fetch_catalog(request).await?;for db in response.into_inner().databases { println!("Database: {}, Events: {}", db.name, db.latest_revision);}Event Operations
Section titled “Event Operations”Transact
Section titled “Transact”Atomically writes a transaction of events.
message TransactionRequest { string database_name = 1; Transaction transaction = 2;}
message Transaction { repeated Event events = 1; repeated Constraint constraints = 2;}
message TransactionReply { uint64 revision = 1;}Example:
use uuid::Uuid;use prost_types::Timestamp;
let event = Event { id: Uuid::new_v4().to_string(), spec_version: "1.0".into(), ty: "OrderCreated".into(), source: "order-service".into(), subject: Some("order-123".into()), time: Some(Timestamp::from(SystemTime::now())), data: Some(order_data), ..Default::default()};
let transaction = Transaction { events: vec![event], constraints: vec![Constraint { constraint: Some(constraint::Constraint::MinRevision( MinRevisionConstraint { revision: last_known_revision, selector: None } )), }],};
let request = TransactionRequest { database_name: "my-events".into(), transaction: Some(transaction),};
let response = client.transact(request).await?;println!("Events stored at revision: {}", response.into_inner().revision);Query Events
Section titled “Query Events”Queries events with flexible filtering.
message QueryEventsRequest { string database_name = 1; EventQuery query = 2;}
message EventQuery { EventSelection event_selection = 1; RevisionRange revision_range = 2; EffectiveTimeRange effective_time_range = 3; Direction direction = 4; uint32 limit = 5;}Example:
let query = EventQuery { event_selection: Some(EventSelection { selectors: vec![EventSelector { stream: Some("order-service".into()), event_type: Some("OrderCreated".into()), ..Default::default() }], }), direction: Direction::Forward as i32, limit: 100, ..Default::default()};
let request = QueryEventsRequest { database_name: "my-events".into(), query: Some(query),};
// Stream resultslet mut stream = client.query_events(request).await?.into_inner();while let Some(event) = stream.message().await? { println!("Event {}: {}", event.id, event.ty);}Scan Database Log
Section titled “Scan Database Log”Scans all events in revision order.
message ScanDatabaseLogRequest { string database_name = 1; uint64 start_revision = 2; Direction direction = 3; uint32 limit = 4;}Example:
let request = ScanDatabaseLogRequest { database_name: "my-events".into(), start_revision: 1, direction: Direction::Forward as i32, limit: 0,};
let mut stream = client.scan_database_log(request).await?.into_inner();while let Some(event) = stream.message().await? { if let Some(rev) = event.extensions.get("revision") { println!("Revision {}: {}", rev, event.ty); }}Constraints
Section titled “Constraints”MinRevision Constraint
Section titled “MinRevision Constraint”Ensures database hasn’t changed since last read.
message MinRevisionConstraint { uint64 revision = 1; EventSelector selector = 2; // Optional}MaxRevision Constraint
Section titled “MaxRevision Constraint”Ensures events are appended in order.
message MaxRevisionConstraint { uint64 revision = 1; EventSelector selector = 2; // Optional}RevisionRange Constraint
Section titled “RevisionRange Constraint”Ensures specific revision range exists.
message RevisionRangeConstraint { uint64 start = 1; uint64 end = 2; EventSelector selector = 3; // Optional}State Views
Section titled “State Views”Define State View
Section titled “Define State View”Registers a new State View component.
message DefineStateViewRequest { string database_name = 1; StateViewDefinition definition = 2;}
message StateViewDefinition { string name = 1; string description = 2; EventSelection event_selection = 3; bytes wasm_component = 4; Priority priority = 5; string content_type = 6; string schema_id = 7;}Example:
let definition = StateViewDefinition { name: "order-summary".into(), description: "Summary of orders by status".into(), event_selection: Some(EventSelection { selectors: vec![EventSelector { event_type: Some("OrderCreated".into()), ..Default::default() }], }), wasm_component: wasm_bytes, priority: Priority::High as i32, content_type: "application/json".into(), ..Default::default()};
let request = DefineStateViewRequest { database_name: "my-events".into(), definition: Some(definition),};
client.define_state_view(request).await?;Fetch State View
Section titled “Fetch State View”Retrieves current state from a State View.
message FetchStateViewRequest { string database_name = 1; string state_view_name = 2; map<string, string> parameters = 3;}Example:
use std::collections::HashMap;
let mut parameters = HashMap::new();parameters.insert("customer_id".into(), "CUST-123".into());
let request = FetchStateViewRequest { database_name: "my-events".into(), state_view_name: "customer-profile".into(), parameters,};
let state = client.fetch_state_view(request).await?.into_inner();let profile: CustomerProfile = serde_json::from_slice(&state.bytes)?;State Changes
Section titled “State Changes”Define State Change
Section titled “Define State Change”Registers a command processor component.
message DefineStateChangeRequest { string database_name = 1; StateChangeDefinition definition = 2;}
message StateChangeDefinition { string name = 1; string description = 2; string state_view_name = 3; bytes wasm_component = 4;}Process State Change
Section titled “Process State Change”Executes a command and returns events.
message ProcessStateChangeRequest { string database_name = 1; string state_change_name = 2; Command command = 3; map<string, string> parameters = 4;}
message ProcessStateChangeResponse { oneof result { Success success = 1; Error error = 2; }}
message Success { repeated Event events = 1; uint64 revision = 2;}Example:
let command_json = r#"{ "type": "CreateOrder", "customerId": "CUST-123", "items": [...]}"#;
let command = Command { content_type: "application/json".into(), bytes: command_json.as_bytes().to_vec(),};
let request = ProcessStateChangeRequest { database_name: "my-events".into(), state_change_name: "process-order".into(), command: Some(command), parameters: HashMap::new(),};
let response = client.process_state_change(request).await?.into_inner();match response.result { Some(process_state_change_response::Result::Success(success)) => { println!("Created {} events at revision {}", success.events.len(), success.revision); } Some(process_state_change_response::Result::Error(error)) => { println!("Error {}: {}", error.code, error.message); } None => {}}Error Handling
Section titled “Error Handling”gRPC Status Codes
Section titled “gRPC Status Codes”Common status codes returned:
| Code | Description | Example |
|---|---|---|
OK | Success | Normal operation |
INVALID_ARGUMENT | Bad request | Invalid event format |
NOT_FOUND | Resource missing | Database doesn’t exist |
ALREADY_EXISTS | Duplicate | Database name taken |
FAILED_PRECONDITION | Constraint violation | Revision mismatch |
RESOURCE_EXHAUSTED | Limit exceeded | Transaction too large |
UNAVAILABLE | Service down | Temporary outage |
Error Details
Section titled “Error Details”use tonic::{Code, Status};
match client.transact(request).await { Ok(response) => { println!("Success: revision {}", response.into_inner().revision); } Err(status) => match status.code() { Code::FailedPrecondition => { // Constraint violation - retry with updated revision println!("Constraint failed: {}", status.message()); } Code::InvalidArgument => { // Bad request - fix and retry println!("Invalid request: {}", status.message()); } _ => return Err(status.into()), },}Performance Tips
Section titled “Performance Tips”Connection Pooling
Section titled “Connection Pooling”use tonic::transport::Channel;
// Reuse channels across requests - Channel is Clone and cheap to clonelet channel = Channel::from_static("http://localhost:8080") .connect() .await?;
// Clone the channel for concurrent uselet client1 = EvidentSourceClient::new(channel.clone());let client2 = EvidentSourceClient::new(channel.clone());Streaming Large Results
Section titled “Streaming Large Results”// Stream events as they arrivelet mut stream = client.query_events(request).await?.into_inner();
while let Some(result) = stream.message().await? { // Process each event as it arrives process_event(result).await?;}Transaction Operations
Section titled “Transaction Operations”// Combine multiple events in one transactionlet events: Vec<Event> = (1..=100) .map(|i| create_event(i)) .collect();
let transaction = Transaction { events, constraints: vec![],};
// Single network round tripclient.transact(TransactionRequest { database_name: "my-events".into(), transaction: Some(transaction),}).await?;Client Configuration
Section titled “Client Configuration”Timeouts
Section titled “Timeouts”use std::time::Duration;
let channel = Channel::from_static("http://localhost:8080") .timeout(Duration::from_secs(30)) .connect() .await?;Retry Policy
Section titled “Retry Policy”// Configure retry with tower middlewareuse tower::ServiceBuilder;
let channel = Channel::from_static("http://localhost:8080") .connect() .await?;
// Use tower-retry or custom retry logicCompression
Section titled “Compression”use tonic::codec::CompressionEncoding;
let mut client = EvidentSourceClient::new(channel) .send_compressed(CompressionEncoding::Gzip) .accept_compressed(CompressionEncoding::Gzip);Next Steps
Section titled “Next Steps”- Explore the REST API for simpler integration
- See Client Libraries for language-specific guides
- Download example applications from the Customer Portal
- Read Performance Tuning for optimization