Skip to content

gRPC API

The gRPC API provides high-performance, type-safe access to all EvidentSource functionality. It’s the recommended API for production applications and client libraries.

The complete service is defined in service.proto:

service EvidentSource {
// Database operations
rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse);
rpc DeleteDatabase(DeleteDatabaseRequest) returns (DeleteDatabaseResponse);
rpc FetchCatalog(FetchCatalogRequest) returns (FetchCatalogResponse);
// Event operations
rpc Transact(TransactionRequest) returns (TransactionReply);
rpc QueryEvents(QueryEventsRequest) returns (stream Event);
rpc ScanDatabaseLog(ScanDatabaseLogRequest) returns (stream Event);
// Database state
rpc FetchLatestDatabase(FetchLatestDatabaseRequest) returns (Database);
rpc FetchDatabaseAtRevision(FetchDatabaseAtRevisionRequest) returns (Database);
rpc DatabaseEffectiveAtTimestamp(DatabaseEffectiveAtTimestampRequest) returns (Database);
// State views
rpc DefineStateView(DefineStateViewRequest) returns (DefineStateViewResponse);
rpc FetchStateView(FetchStateViewRequest) returns (State);
rpc FetchStateViewAtRevision(FetchStateViewAtRevisionRequest) returns (State);
// State changes
rpc DefineStateChange(DefineStateChangeRequest) returns (DefineStateChangeResponse);
rpc ProcessStateChange(ProcessStateChangeRequest) returns (ProcessStateChangeResponse);
}
// Kotlin/Java
import io.grpc.ManagedChannelBuilder
import com.evidentdb.grpc.EvidentSourceGrpc
val channel = ManagedChannelBuilder
.forAddress("localhost", 8080)
.usePlaintext() // For development only
.build()
val client = EvidentSourceGrpc.newBlockingStub(channel)
# Python
import grpc
import evidentsource_pb2_grpc as evident_grpc
channel = grpc.insecure_channel('localhost:8080')
client = evident_grpc.EvidentSourceStub(channel)
// With TLS
val channel = ManagedChannelBuilder
.forAddress("api.example.com", 443)
.useTransportSecurity()
.build()

Creates a new event database.

message CreateDatabaseRequest {
string database_name = 1;
}
message CreateDatabaseResponse {
Database database = 1;
}

Example:

val request = CreateDatabaseRequest.newBuilder()
.setDatabaseName("my-events")
.build()
val response = client.createDatabase(request)
println("Created database: ${response.database.name}")

Errors:

  • ALREADY_EXISTS: Database name already taken
  • INVALID_ARGUMENT: Invalid database name format

Permanently deletes a database and all its events.

message DeleteDatabaseRequest {
string database_name = 1;
}
message DeleteDatabaseResponse {
// Empty
}

Example:

val request = DeleteDatabaseRequest.newBuilder()
.setDatabaseName("old-events")
.build()
client.deleteDatabase(request)

Lists all databases.

message FetchCatalogRequest {
// Empty
}
message FetchCatalogResponse {
repeated Database databases = 1;
}

Example:

val catalog = client.fetchCatalog(FetchCatalogRequest.getDefaultInstance())
catalog.databasesList.forEach { db ->
println("Database: ${db.name}, Events: ${db.latestRevision}")
}

Atomically writes a transaction of events.

message TransactionRequest {
string database_name = 1;
Transaction transaction = 2;
}
message Transaction {
repeated Event events = 1;
repeated Constraint constraints = 2;
}
message TransactionReply {
uint64 revision = 1;
}

Example:

val event = Event.newBuilder()
.setId(UUID.randomUUID().toString())
.setSpecVersion("1.0")
.setType("OrderCreated")
.setSource("order-service")
.setSubject("order-123")
.setTime(Timestamp.newBuilder().setSeconds(Instant.now().epochSecond))
.setData(Any.pack(orderData))
.build()
val transaction = Transaction.newBuilder()
.addEvents(event)
.addConstraints(
Constraint.newBuilder()
.setMinRevision(
MinRevisionConstraint.newBuilder()
.setRevision(lastKnownRevision)
)
)
.build()
val request = TransactionRequest.newBuilder()
.setDatabaseName("my-events")
.setTransaction(transaction)
.build()
val response = client.transact(request)
println("Events stored at revision: ${response.revision}")

Queries events with flexible filtering.

message QueryEventsRequest {
string database_name = 1;
EventQuery query = 2;
}
message EventQuery {
EventSelection event_selection = 1;
RevisionRange revision_range = 2;
EffectiveTimeRange effective_time_range = 3;
Direction direction = 4;
uint32 limit = 5;
}

Example:

val query = EventQuery.newBuilder()
.setEventSelection(
EventSelection.newBuilder()
.addSelectors(
EventSelector.newBuilder()
.setStream("order-service")
.setEventType("OrderCreated")
)
)
.setDirection(Direction.FORWARD)
.setLimit(100)
.build()
val request = QueryEventsRequest.newBuilder()
.setDatabaseName("my-events")
.setQuery(query)
.build()
// Stream results
client.queryEvents(request).forEach { event ->
println("Event ${event.id}: ${event.type}")
}

Scans all events in revision order.

message ScanDatabaseLogRequest {
string database_name = 1;
uint64 start_revision = 2;
Direction direction = 3;
uint32 limit = 4;
}

Example:

val request = ScanDatabaseLogRequest.newBuilder()
.setDatabaseName("my-events")
.setStartRevision(1)
.setDirection(Direction.FORWARD)
.build()
client.scanDatabaseLog(request).forEach { event ->
val revision = event.extensionsMap["revision"]
println("Revision $revision: ${event.type}")
}

Ensures database hasn’t changed since last read.

message MinRevisionConstraint {
uint64 revision = 1;
EventSelector selector = 2; // Optional
}

Ensures events are appended in order.

message MaxRevisionConstraint {
uint64 revision = 1;
EventSelector selector = 2; // Optional
}

Ensures specific revision range exists.

message RevisionRangeConstraint {
uint64 start = 1;
uint64 end = 2;
EventSelector selector = 3; // Optional
}

Registers a new State View component.

message DefineStateViewRequest {
string database_name = 1;
StateViewDefinition definition = 2;
}
message StateViewDefinition {
string name = 1;
string description = 2;
EventSelection event_selection = 3;
bytes wasm_component = 4;
Priority priority = 5;
string content_type = 6;
string schema_id = 7;
}

Example:

val definition = StateViewDefinition.newBuilder()
.setName("order-summary")
.setDescription("Summary of orders by status")
.setEventSelection(
EventSelection.newBuilder()
.addSelectors(
EventSelector.newBuilder()
.setEventType("OrderCreated")
)
)
.setWasmComponent(ByteString.copyFrom(wasmBytes))
.setPriority(Priority.HIGH)
.setContentType("application/json")
.build()
val request = DefineStateViewRequest.newBuilder()
.setDatabaseName("my-events")
.setDefinition(definition)
.build()
client.defineStateView(request)

Retrieves current state from a State View.

message FetchStateViewRequest {
string database_name = 1;
string state_view_name = 2;
map<string, string> parameters = 3;
}

Example:

val request = FetchStateViewRequest.newBuilder()
.setDatabaseName("my-events")
.setStateViewName("customer-profile")
.putParameters("customer_id", "CUST-123")
.build()
val state = client.fetchStateView(request)
val profile = CustomerProfile.parseFrom(state.bytes)

Registers a command processor component.

message DefineStateChangeRequest {
string database_name = 1;
StateChangeDefinition definition = 2;
}
message StateChangeDefinition {
string name = 1;
string description = 2;
string state_view_name = 3;
bytes wasm_component = 4;
}

Executes a command and returns events.

message ProcessStateChangeRequest {
string database_name = 1;
string state_change_name = 2;
Command command = 3;
map<string, string> parameters = 4;
}
message ProcessStateChangeResponse {
oneof result {
Success success = 1;
Error error = 2;
}
}
message Success {
repeated Event events = 1;
uint64 revision = 2;
}

Example:

val command = Command.newBuilder()
.setContentType("application/json")
.setBytes(ByteString.copyFromUtf8("""
{
"type": "CreateOrder",
"customerId": "CUST-123",
"items": [...]
}
"""))
.build()
val request = ProcessStateChangeRequest.newBuilder()
.setDatabaseName("my-events")
.setStateChangeName("process-order")
.setCommand(command)
.build()
when (val result = client.processStateChange(request).resultCase) {
SUCCESS -> {
val success = response.success
println("Created ${success.eventsCount} events at revision ${success.revision}")
}
ERROR -> {
val error = response.error
println("Error ${error.code}: ${error.message}")
}
}

Common status codes returned:

CodeDescriptionExample
OKSuccessNormal operation
INVALID_ARGUMENTBad requestInvalid event format
NOT_FOUNDResource missingDatabase doesn’t exist
ALREADY_EXISTSDuplicateDatabase name taken
FAILED_PRECONDITIONConstraint violationRevision mismatch
RESOURCE_EXHAUSTEDLimit exceededTransaction too large
UNAVAILABLEService downTemporary outage
try {
client.transact(request)
} catch (e: StatusRuntimeException) {
when (e.status.code) {
Status.Code.FAILED_PRECONDITION -> {
// Constraint violation - retry with updated revision
val details = e.status.description
println("Constraint failed: $details")
}
Status.Code.INVALID_ARGUMENT -> {
// Bad request - fix and retry
println("Invalid request: ${e.status.description}")
}
else -> throw e
}
}
// Reuse channels across requests
val channel = ManagedChannelBuilder
.forAddress("localhost", 8080)
.usePlaintext()
.maxInboundMessageSize(10 * 1024 * 1024) // 10MB
.build()
// Create once, reuse many times
val client = EvidentDbGrpc.newBlockingStub(channel)
// Use async stub for large queries
val asyncClient = EvidentSourceGrpc.newStub(channel)
val responseObserver = object : StreamObserver<Event> {
override fun onNext(event: Event) {
// Process each event as it arrives
processEvent(event)
}
override fun onError(t: Throwable) {
logger.error("Stream error", t)
}
override fun onCompleted() {
logger.info("Stream completed")
}
}
asyncClient.queryEvents(request, responseObserver)
// Combine multiple events in one transaction
val transaction = Transaction.newBuilder().apply {
for (i in 1..100) {
addEvents(createEvent(i))
}
}.build()
// Single network round trip
client.transact(request)
val client = EvidentSourceGrpc.newBlockingStub(channel)
.withDeadlineAfter(30, TimeUnit.SECONDS)
val channel = ManagedChannelBuilder
.forAddress("localhost", 8080)
.enableRetry()
.maxRetryAttempts(3)
.build()
val client = EvidentSourceGrpc.newBlockingStub(channel)
.withCompression("gzip")