gRPC API
The gRPC API provides high-performance, type-safe access to all EvidentSource functionality. It’s the recommended API for production applications and client libraries.
Service Definition
Section titled “Service Definition”The complete service is defined in service.proto:
service EvidentSource { // Database operations rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse); rpc DeleteDatabase(DeleteDatabaseRequest) returns (DeleteDatabaseResponse); rpc FetchCatalog(FetchCatalogRequest) returns (FetchCatalogResponse);
// Event operations rpc Transact(TransactionRequest) returns (TransactionReply); rpc QueryEvents(QueryEventsRequest) returns (stream Event); rpc ScanDatabaseLog(ScanDatabaseLogRequest) returns (stream Event);
// Database state rpc FetchLatestDatabase(FetchLatestDatabaseRequest) returns (Database); rpc FetchDatabaseAtRevision(FetchDatabaseAtRevisionRequest) returns (Database); rpc DatabaseEffectiveAtTimestamp(DatabaseEffectiveAtTimestampRequest) returns (Database);
// State views rpc DefineStateView(DefineStateViewRequest) returns (DefineStateViewResponse); rpc FetchStateView(FetchStateViewRequest) returns (State); rpc FetchStateViewAtRevision(FetchStateViewAtRevisionRequest) returns (State);
// State changes rpc DefineStateChange(DefineStateChangeRequest) returns (DefineStateChangeResponse); rpc ProcessStateChange(ProcessStateChangeRequest) returns (ProcessStateChangeResponse);}Connection Setup
Section titled “Connection Setup”Basic Connection
Section titled “Basic Connection”// Kotlin/Javaimport io.grpc.ManagedChannelBuilderimport com.evidentdb.grpc.EvidentSourceGrpc
val channel = ManagedChannelBuilder .forAddress("localhost", 8080) .usePlaintext() // For development only .build()
val client = EvidentSourceGrpc.newBlockingStub(channel)# Pythonimport grpcimport evidentsource_pb2_grpc as evident_grpc
channel = grpc.insecure_channel('localhost:8080')client = evident_grpc.EvidentSourceStub(channel)Secure Connection
Section titled “Secure Connection”// With TLSval channel = ManagedChannelBuilder .forAddress("api.example.com", 443) .useTransportSecurity() .build()Database Operations
Section titled “Database Operations”Create Database
Section titled “Create Database”Creates a new event database.
message CreateDatabaseRequest { string database_name = 1;}
message CreateDatabaseResponse { Database database = 1;}Example:
val request = CreateDatabaseRequest.newBuilder() .setDatabaseName("my-events") .build()
val response = client.createDatabase(request)println("Created database: ${response.database.name}")Errors:
ALREADY_EXISTS: Database name already takenINVALID_ARGUMENT: Invalid database name format
Delete Database
Section titled “Delete Database”Permanently deletes a database and all its events.
message DeleteDatabaseRequest { string database_name = 1;}
message DeleteDatabaseResponse { // Empty}Example:
val request = DeleteDatabaseRequest.newBuilder() .setDatabaseName("old-events") .build()
client.deleteDatabase(request)Fetch Catalog
Section titled “Fetch Catalog”Lists all databases.
message FetchCatalogRequest { // Empty}
message FetchCatalogResponse { repeated Database databases = 1;}Example:
val catalog = client.fetchCatalog(FetchCatalogRequest.getDefaultInstance())catalog.databasesList.forEach { db -> println("Database: ${db.name}, Events: ${db.latestRevision}")}Event Operations
Section titled “Event Operations”Transact
Section titled “Transact”Atomically writes a transaction of events.
message TransactionRequest { string database_name = 1; Transaction transaction = 2;}
message Transaction { repeated Event events = 1; repeated Constraint constraints = 2;}
message TransactionReply { uint64 revision = 1;}Example:
val event = Event.newBuilder() .setId(UUID.randomUUID().toString()) .setSpecVersion("1.0") .setType("OrderCreated") .setSource("order-service") .setSubject("order-123") .setTime(Timestamp.newBuilder().setSeconds(Instant.now().epochSecond)) .setData(Any.pack(orderData)) .build()
val transaction = Transaction.newBuilder() .addEvents(event) .addConstraints( Constraint.newBuilder() .setMinRevision( MinRevisionConstraint.newBuilder() .setRevision(lastKnownRevision) ) ) .build()
val request = TransactionRequest.newBuilder() .setDatabaseName("my-events") .setTransaction(transaction) .build()
val response = client.transact(request)println("Events stored at revision: ${response.revision}")Query Events
Section titled “Query Events”Queries events with flexible filtering.
message QueryEventsRequest { string database_name = 1; EventQuery query = 2;}
message EventQuery { EventSelection event_selection = 1; RevisionRange revision_range = 2; EffectiveTimeRange effective_time_range = 3; Direction direction = 4; uint32 limit = 5;}Example:
val query = EventQuery.newBuilder() .setEventSelection( EventSelection.newBuilder() .addSelectors( EventSelector.newBuilder() .setStream("order-service") .setEventType("OrderCreated") ) ) .setDirection(Direction.FORWARD) .setLimit(100) .build()
val request = QueryEventsRequest.newBuilder() .setDatabaseName("my-events") .setQuery(query) .build()
// Stream resultsclient.queryEvents(request).forEach { event -> println("Event ${event.id}: ${event.type}")}Scan Database Log
Section titled “Scan Database Log”Scans all events in revision order.
message ScanDatabaseLogRequest { string database_name = 1; uint64 start_revision = 2; Direction direction = 3; uint32 limit = 4;}Example:
val request = ScanDatabaseLogRequest.newBuilder() .setDatabaseName("my-events") .setStartRevision(1) .setDirection(Direction.FORWARD) .build()
client.scanDatabaseLog(request).forEach { event -> val revision = event.extensionsMap["revision"] println("Revision $revision: ${event.type}")}Constraints
Section titled “Constraints”MinRevision Constraint
Section titled “MinRevision Constraint”Ensures database hasn’t changed since last read.
message MinRevisionConstraint { uint64 revision = 1; EventSelector selector = 2; // Optional}MaxRevision Constraint
Section titled “MaxRevision Constraint”Ensures events are appended in order.
message MaxRevisionConstraint { uint64 revision = 1; EventSelector selector = 2; // Optional}RevisionRange Constraint
Section titled “RevisionRange Constraint”Ensures specific revision range exists.
message RevisionRangeConstraint { uint64 start = 1; uint64 end = 2; EventSelector selector = 3; // Optional}State Views
Section titled “State Views”Define State View
Section titled “Define State View”Registers a new State View component.
message DefineStateViewRequest { string database_name = 1; StateViewDefinition definition = 2;}
message StateViewDefinition { string name = 1; string description = 2; EventSelection event_selection = 3; bytes wasm_component = 4; Priority priority = 5; string content_type = 6; string schema_id = 7;}Example:
val definition = StateViewDefinition.newBuilder() .setName("order-summary") .setDescription("Summary of orders by status") .setEventSelection( EventSelection.newBuilder() .addSelectors( EventSelector.newBuilder() .setEventType("OrderCreated") ) ) .setWasmComponent(ByteString.copyFrom(wasmBytes)) .setPriority(Priority.HIGH) .setContentType("application/json") .build()
val request = DefineStateViewRequest.newBuilder() .setDatabaseName("my-events") .setDefinition(definition) .build()
client.defineStateView(request)Fetch State View
Section titled “Fetch State View”Retrieves current state from a State View.
message FetchStateViewRequest { string database_name = 1; string state_view_name = 2; map<string, string> parameters = 3;}Example:
val request = FetchStateViewRequest.newBuilder() .setDatabaseName("my-events") .setStateViewName("customer-profile") .putParameters("customer_id", "CUST-123") .build()
val state = client.fetchStateView(request)val profile = CustomerProfile.parseFrom(state.bytes)State Changes
Section titled “State Changes”Define State Change
Section titled “Define State Change”Registers a command processor component.
message DefineStateChangeRequest { string database_name = 1; StateChangeDefinition definition = 2;}
message StateChangeDefinition { string name = 1; string description = 2; string state_view_name = 3; bytes wasm_component = 4;}Process State Change
Section titled “Process State Change”Executes a command and returns events.
message ProcessStateChangeRequest { string database_name = 1; string state_change_name = 2; Command command = 3; map<string, string> parameters = 4;}
message ProcessStateChangeResponse { oneof result { Success success = 1; Error error = 2; }}
message Success { repeated Event events = 1; uint64 revision = 2;}Example:
val command = Command.newBuilder() .setContentType("application/json") .setBytes(ByteString.copyFromUtf8(""" { "type": "CreateOrder", "customerId": "CUST-123", "items": [...] } """)) .build()
val request = ProcessStateChangeRequest.newBuilder() .setDatabaseName("my-events") .setStateChangeName("process-order") .setCommand(command) .build()
when (val result = client.processStateChange(request).resultCase) { SUCCESS -> { val success = response.success println("Created ${success.eventsCount} events at revision ${success.revision}") } ERROR -> { val error = response.error println("Error ${error.code}: ${error.message}") }}Error Handling
Section titled “Error Handling”gRPC Status Codes
Section titled “gRPC Status Codes”Common status codes returned:
| Code | Description | Example |
|---|---|---|
OK | Success | Normal operation |
INVALID_ARGUMENT | Bad request | Invalid event format |
NOT_FOUND | Resource missing | Database doesn’t exist |
ALREADY_EXISTS | Duplicate | Database name taken |
FAILED_PRECONDITION | Constraint violation | Revision mismatch |
RESOURCE_EXHAUSTED | Limit exceeded | Transaction too large |
UNAVAILABLE | Service down | Temporary outage |
Error Details
Section titled “Error Details”try { client.transact(request)} catch (e: StatusRuntimeException) { when (e.status.code) { Status.Code.FAILED_PRECONDITION -> { // Constraint violation - retry with updated revision val details = e.status.description println("Constraint failed: $details") } Status.Code.INVALID_ARGUMENT -> { // Bad request - fix and retry println("Invalid request: ${e.status.description}") } else -> throw e }}Performance Tips
Section titled “Performance Tips”Connection Pooling
Section titled “Connection Pooling”// Reuse channels across requestsval channel = ManagedChannelBuilder .forAddress("localhost", 8080) .usePlaintext() .maxInboundMessageSize(10 * 1024 * 1024) // 10MB .build()
// Create once, reuse many timesval client = EvidentDbGrpc.newBlockingStub(channel)Streaming Large Results
Section titled “Streaming Large Results”// Use async stub for large queriesval asyncClient = EvidentSourceGrpc.newStub(channel)
val responseObserver = object : StreamObserver<Event> { override fun onNext(event: Event) { // Process each event as it arrives processEvent(event) }
override fun onError(t: Throwable) { logger.error("Stream error", t) }
override fun onCompleted() { logger.info("Stream completed") }}
asyncClient.queryEvents(request, responseObserver)Transaction Operations
Section titled “Transaction Operations”// Combine multiple events in one transactionval transaction = Transaction.newBuilder().apply { for (i in 1..100) { addEvents(createEvent(i)) }}.build()
// Single network round tripclient.transact(request)Client Configuration
Section titled “Client Configuration”Timeouts
Section titled “Timeouts”val client = EvidentSourceGrpc.newBlockingStub(channel) .withDeadlineAfter(30, TimeUnit.SECONDS)Retry Policy
Section titled “Retry Policy”val channel = ManagedChannelBuilder .forAddress("localhost", 8080) .enableRetry() .maxRetryAttempts(3) .build()Compression
Section titled “Compression”val client = EvidentSourceGrpc.newBlockingStub(channel) .withCompression("gzip")Next Steps
Section titled “Next Steps”- Explore the REST API for simpler integration
- See Client Libraries for language-specific guides
- Download example applications from the Customer Portal
- Read Performance Tuning for optimization