Skip to content

gRPC API

The gRPC API provides high-performance, type-safe access to all EvidentSource functionality. It’s the recommended API for production applications and client libraries.

The complete service is defined in service.proto:

service EvidentSource {
// Database operations
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse);
rpc DeleteDatabase(DeleteDatabaseRequest) returns (DeleteDatabaseResponse);
rpc FetchCatalog(FetchCatalogRequest) returns (FetchCatalogResponse);
// Event operations
rpc Transact(TransactionRequest) returns (TransactionReply);
rpc QueryEvents(QueryEventsRequest) returns (stream Event);
rpc ScanDatabaseLog(ScanDatabaseLogRequest) returns (stream Event);
// Database state
rpc FetchLatestDatabase(FetchLatestDatabaseRequest) returns (Database);
rpc FetchDatabaseAtRevision(FetchDatabaseAtRevisionRequest) returns (Database);
rpc DatabaseEffectiveAtTimestamp(DatabaseEffectiveAtTimestampRequest) returns (Database);
// State views
rpc DefineStateView(DefineStateViewRequest) returns (DefineStateViewResponse);
rpc FetchStateView(FetchStateViewRequest) returns (State);
rpc FetchStateViewAtRevision(FetchStateViewAtRevisionRequest) returns (State);
// State changes
rpc DefineStateChange(DefineStateChangeRequest) returns (DefineStateChangeResponse);
rpc ProcessStateChange(ProcessStateChangeRequest) returns (ProcessStateChangeResponse);
}
use tonic::transport::Channel;
use evidentsource::proto::evident_source_client::EvidentSourceClient;
// Development (insecure)
let channel = Channel::from_static("http://localhost:8080")
.connect()
.await?;
let client = EvidentSourceClient::new(channel);
use tonic::transport::{Channel, ClientTlsConfig};
// With TLS
let tls = ClientTlsConfig::new();
let channel = Channel::from_static("https://api.example.com")
.tls_config(tls)?
.connect()
.await?;
let client = EvidentSourceClient::new(channel);

Creates a new event database.

message CreateDatabaseRequest {
string database_name = 1;
}
message CreateDatabaseResponse {
Database database = 1;
}

Example:

let request = CreateDatabaseRequest {
database_name: "my-events".into(),
};
let response = client.create_database(request).await?;
println!("Created database: {}", response.into_inner().database.unwrap().name);

Errors:

  • ALREADY_EXISTS: Database name already taken
  • INVALID_ARGUMENT: Invalid database name format

Permanently deletes a database and all its events.

message DeleteDatabaseRequest {
string database_name = 1;
}
message DeleteDatabaseResponse {
// Empty
}

Example:

let request = DeleteDatabaseRequest {
database_name: "old-events".into(),
};
client.delete_database(request).await?;

Lists all databases.

message FetchCatalogRequest {
// Empty
}
message FetchCatalogResponse {
repeated Database databases = 1;
}

Example:

let request = FetchCatalogRequest {};
let response = client.fetch_catalog(request).await?;
for db in response.into_inner().databases {
println!("Database: {}, Events: {}", db.name, db.latest_revision);
}

Atomically writes a transaction of events.

message TransactionRequest {
string database_name = 1;
Transaction transaction = 2;
}
message Transaction {
repeated Event events = 1;
repeated Constraint constraints = 2;
}
message TransactionReply {
uint64 revision = 1;
}

Example:

use uuid::Uuid;
use prost_types::Timestamp;
let event = Event {
id: Uuid::new_v4().to_string(),
spec_version: "1.0".into(),
ty: "OrderCreated".into(),
source: "order-service".into(),
subject: Some("order-123".into()),
time: Some(Timestamp::from(SystemTime::now())),
data: Some(order_data),
..Default::default()
};
let transaction = Transaction {
events: vec![event],
constraints: vec![Constraint {
constraint: Some(constraint::Constraint::MinRevision(
MinRevisionConstraint { revision: last_known_revision, selector: None }
)),
}],
};
let request = TransactionRequest {
database_name: "my-events".into(),
transaction: Some(transaction),
};
let response = client.transact(request).await?;
println!("Events stored at revision: {}", response.into_inner().revision);

Queries events with flexible filtering.

message QueryEventsRequest {
string database_name = 1;
EventQuery query = 2;
}
message EventQuery {
EventSelection event_selection = 1;
RevisionRange revision_range = 2;
EffectiveTimeRange effective_time_range = 3;
Direction direction = 4;
uint32 limit = 5;
}

Example:

let query = EventQuery {
event_selection: Some(EventSelection {
selectors: vec![EventSelector {
stream: Some("order-service".into()),
event_type: Some("OrderCreated".into()),
..Default::default()
}],
}),
direction: Direction::Forward as i32,
limit: 100,
..Default::default()
};
let request = QueryEventsRequest {
database_name: "my-events".into(),
query: Some(query),
};
// Stream results
let mut stream = client.query_events(request).await?.into_inner();
while let Some(event) = stream.message().await? {
println!("Event {}: {}", event.id, event.ty);
}

Scans all events in revision order.

message ScanDatabaseLogRequest {
string database_name = 1;
uint64 start_revision = 2;
Direction direction = 3;
uint32 limit = 4;
}

Example:

let request = ScanDatabaseLogRequest {
database_name: "my-events".into(),
start_revision: 1,
direction: Direction::Forward as i32,
limit: 0,
};
let mut stream = client.scan_database_log(request).await?.into_inner();
while let Some(event) = stream.message().await? {
if let Some(rev) = event.extensions.get("revision") {
println!("Revision {}: {}", rev, event.ty);
}
}

Ensures database hasn’t changed since last read.

message MinRevisionConstraint {
uint64 revision = 1;
EventSelector selector = 2; // Optional
}

Ensures events are appended in order.

message MaxRevisionConstraint {
uint64 revision = 1;
EventSelector selector = 2; // Optional
}

Ensures specific revision range exists.

message RevisionRangeConstraint {
uint64 start = 1;
uint64 end = 2;
EventSelector selector = 3; // Optional
}

Registers a new State View component.

message DefineStateViewRequest {
string database_name = 1;
StateViewDefinition definition = 2;
}
message StateViewDefinition {
string name = 1;
string description = 2;
EventSelection event_selection = 3;
bytes wasm_component = 4;
Priority priority = 5;
string content_type = 6;
string schema_id = 7;
}

Example:

let definition = StateViewDefinition {
name: "order-summary".into(),
description: "Summary of orders by status".into(),
event_selection: Some(EventSelection {
selectors: vec![EventSelector {
event_type: Some("OrderCreated".into()),
..Default::default()
}],
}),
wasm_component: wasm_bytes,
priority: Priority::High as i32,
content_type: "application/json".into(),
..Default::default()
};
let request = DefineStateViewRequest {
database_name: "my-events".into(),
definition: Some(definition),
};
client.define_state_view(request).await?;

Retrieves current state from a State View.

message FetchStateViewRequest {
string database_name = 1;
string state_view_name = 2;
map<string, string> parameters = 3;
}

Example:

use std::collections::HashMap;
let mut parameters = HashMap::new();
parameters.insert("customer_id".into(), "CUST-123".into());
let request = FetchStateViewRequest {
database_name: "my-events".into(),
state_view_name: "customer-profile".into(),
parameters,
};
let state = client.fetch_state_view(request).await?.into_inner();
let profile: CustomerProfile = serde_json::from_slice(&state.bytes)?;

Registers a command processor component.

message DefineStateChangeRequest {
string database_name = 1;
StateChangeDefinition definition = 2;
}
message StateChangeDefinition {
string name = 1;
string description = 2;
string state_view_name = 3;
bytes wasm_component = 4;
}

Executes a command and returns events.

message ProcessStateChangeRequest {
string database_name = 1;
string state_change_name = 2;
Command command = 3;
map<string, string> parameters = 4;
}
message ProcessStateChangeResponse {
oneof result {
Success success = 1;
Error error = 2;
}
}
message Success {
repeated Event events = 1;
uint64 revision = 2;
}

Example:

let command_json = r#"{
"type": "CreateOrder",
"customerId": "CUST-123",
"items": [...]
}"#;
let command = Command {
content_type: "application/json".into(),
bytes: command_json.as_bytes().to_vec(),
};
let request = ProcessStateChangeRequest {
database_name: "my-events".into(),
state_change_name: "process-order".into(),
command: Some(command),
parameters: HashMap::new(),
};
let response = client.process_state_change(request).await?.into_inner();
match response.result {
Some(process_state_change_response::Result::Success(success)) => {
println!("Created {} events at revision {}", success.events.len(), success.revision);
}
Some(process_state_change_response::Result::Error(error)) => {
println!("Error {}: {}", error.code, error.message);
}
None => {}
}

Common status codes returned:

CodeDescriptionExample
OKSuccessNormal operation
INVALID_ARGUMENTBad requestInvalid event format
NOT_FOUNDResource missingDatabase doesn’t exist
ALREADY_EXISTSDuplicateDatabase name taken
FAILED_PRECONDITIONConstraint violationRevision mismatch
RESOURCE_EXHAUSTEDLimit exceededTransaction too large
UNAVAILABLEService downTemporary outage
use tonic::{Code, Status};
match client.transact(request).await {
Ok(response) => {
println!("Success: revision {}", response.into_inner().revision);
}
Err(status) => match status.code() {
Code::FailedPrecondition => {
// Constraint violation - retry with updated revision
println!("Constraint failed: {}", status.message());
}
Code::InvalidArgument => {
// Bad request - fix and retry
println!("Invalid request: {}", status.message());
}
_ => return Err(status.into()),
},
}
use tonic::transport::Channel;
// Reuse channels across requests - Channel is Clone and cheap to clone
let channel = Channel::from_static("http://localhost:8080")
.connect()
.await?;
// Clone the channel for concurrent use
let client1 = EvidentSourceClient::new(channel.clone());
let client2 = EvidentSourceClient::new(channel.clone());
// Stream events as they arrive
let mut stream = client.query_events(request).await?.into_inner();
while let Some(result) = stream.message().await? {
// Process each event as it arrives
process_event(result).await?;
}
// Combine multiple events in one transaction
let events: Vec<Event> = (1..=100)
.map(|i| create_event(i))
.collect();
let transaction = Transaction {
events,
constraints: vec![],
};
// Single network round trip
client.transact(TransactionRequest {
database_name: "my-events".into(),
transaction: Some(transaction),
}).await?;
use std::time::Duration;
let channel = Channel::from_static("http://localhost:8080")
.timeout(Duration::from_secs(30))
.connect()
.await?;
// Configure retry with tower middleware
use tower::ServiceBuilder;
let channel = Channel::from_static("http://localhost:8080")
.connect()
.await?;
// Use tower-retry or custom retry logic
use tonic::codec::CompressionEncoding;
let mut client = EvidentSourceClient::new(channel)
.send_compressed(CompressionEncoding::Gzip)
.accept_compressed(CompressionEncoding::Gzip);