Querying Events
Event Query API
Section titled “Event Query API”EvidentSource provides powerful query capabilities for accessing events with flexible filtering, ordering, and streaming support.
Overview
Section titled “Overview”The query API supports:
- Filtering by stream, subject, and event type
- Transaction-time and effective-time ordering
- Forward and reverse iteration
- Streaming large result sets
- Consistent reads at specific revisions
Query Events
Section titled “Query Events”gRPC API
Section titled “gRPC API”rpc QueryEvents(EventQueryRequest) returns (stream EventQueryReply) {}
message EventQueryRequest { string database_name = 1; uint64 revision = 2; bool include_event_detail = 3; DatabaseQuery query = 4;}HTTP API
Section titled “HTTP API”GET /api/v1/db/{database}/{revision}/events?stream=order-service&type=OrderCreated&limit=100Rust API
Section titled “Rust API”use evidentsource_domain::query::{DatabaseQuery, EventSelector};use tokio_stream::StreamExt;
let query = DatabaseQuery::new() .with_selector(EventSelector::new() .with_stream("order-service") .with_type("OrderCreated")) .with_limit(100);
let mut events = app.query_events("my-events", 100, &query);while let Some(event) = events.next().await { let event = event?; println!("Event: {} at revision {}", event.id(), event.revision());}Query Building
Section titled “Query Building”Event Selectors
Section titled “Event Selectors”Filter events by multiple criteria:
// By stream (source)EventSelector::new().with_stream("order-service")
// By subject (aggregate)EventSelector::new().with_subject("order-123")
// By event typeEventSelector::new().with_type("OrderCreated")
// Combine criteria (AND semantics)EventSelector::new() .with_stream("order-service") .with_type("OrderCreated") .with_subject("order-123")Multiple Selectors
Section titled “Multiple Selectors”Use multiple selectors for OR semantics:
let query = DatabaseQuery::new() .with_selector(EventSelector::new() .with_type("OrderCreated")) .with_selector(EventSelector::new() .with_type("OrderUpdated")) .with_selector(EventSelector::new() .with_type("OrderCancelled"));Revision Ranges
Section titled “Revision Ranges”Query specific revision ranges:
use evidentsource_domain::query::RevisionRange;
// Events from revision 10 to 20let query = DatabaseQuery::new() .with_revision_range(RevisionRange::new(10, 20));
// Events after revision 50let query = DatabaseQuery::new() .with_revision_range(RevisionRange::from(50));
// Events before revision 100let query = DatabaseQuery::new() .with_revision_range(RevisionRange::to(100));Time-Based Queries
Section titled “Time-Based Queries”Query by effective time (when events occurred):
use chrono::{Utc, Duration};
let start = Utc::now() - Duration::hours(24);let end = Utc::now();
let query = DatabaseQuery::new() .with_effective_time_range(start, end) .with_temporality(QueryTemporality::EffectiveTime);Ordering and Direction
Section titled “Ordering and Direction”Transaction Time Ordering
Section titled “Transaction Time Ordering”Default ordering by when events were stored:
let query = DatabaseQuery::new() .with_temporality(QueryTemporality::TransactionTime) .with_direction(Direction::Forward);Effective Time Ordering
Section titled “Effective Time Ordering”Order by when events actually occurred:
let query = DatabaseQuery::new() .with_temporality(QueryTemporality::EffectiveTime) .with_direction(Direction::Reverse);Direction
Section titled “Direction”Forward: Oldest to newest (ascending)Reverse: Newest to oldest (descending)
Pagination
Section titled “Pagination”Limit Results
Section titled “Limit Results”let query = DatabaseQuery::new() .with_limit(100); // Return at most 100 eventsCursor-Based Pagination
Section titled “Cursor-Based Pagination”// First pagelet query = DatabaseQuery::new() .with_limit(20);
let mut last_revision = 0;let mut events = app.query_events("my-events", 100, &query);while let Some(event) = events.next().await { let event = event?; last_revision = event.revision(); // Process event}
// Next pagelet query = DatabaseQuery::new() .with_revision_range(RevisionRange::from(last_revision + 1)) .with_limit(20);Fetching Specific Events
Section titled “Fetching Specific Events”By Event ID
Section titled “By Event ID”let event = app.fetch_event_by_id( "my-events", 100, // Query at revision 100 "order-service", // Stream name "evt-123-456" // Event ID).await?;
println!("Found event: {} of type {}", event.id(), event.event_type());By Revisions
Section titled “By Revisions”Fetch multiple events by their revision numbers:
let revisions = vec![10, 20, 30, 40, 50];let events = app.fetch_events_by_revisions( "my-events", &revisions).await?;
for event in events { println!("Event at revision {}: {}", event.revision(), event.id());}Scanning Database Log
Section titled “Scanning Database Log”Iterate through all events in revision order:
gRPC API
Section titled “gRPC API”rpc ScanDatabaseLog(LogScanRequest) returns (stream DatabaseLogReply) {}
message LogScanRequest { string database_name = 1; uint64 start_at_revision = 2; bool include_event_detail = 3;}Rust API
Section titled “Rust API”use tokio_stream::StreamExt;
let mut log = app.scan_database_log("my-events", 1);while let Some(batch) = log.next().await { let batch = batch?; println!("Batch at revision {}: {} events", batch.revision(), batch.event_count());}Index Scanning
Section titled “Index Scanning”List unique values in indexes:
List Streams
Section titled “List Streams”let mut streams = app.list_streams("my-events", 100);while let Some(stream) = streams.next().await { let stream = stream?; println!("Stream: {}", stream);}List Subjects
Section titled “List Subjects”let mut subjects = app.list_subjects("my-events", 100);while let Some(subject) = subjects.next().await { let subject = subject?; println!("Subject: {}", subject);}List Event Types
Section titled “List Event Types”let mut types = app.list_event_types("my-events", 100);while let Some(event_type) = types.next().await { let event_type = event_type?; println!("Event Type: {}", event_type);}Query Examples
Section titled “Query Examples”Get All Events for an Aggregate
Section titled “Get All Events for an Aggregate”let query = DatabaseQuery::new() .with_selector(EventSelector::new() .with_subject("order-123"));
let events: Vec<Event> = app.query_events("my-events", latest_revision, &query) .collect() .await?;Get Recent Events by Type
Section titled “Get Recent Events by Type”use chrono::Duration;
let query = DatabaseQuery::new() .with_selector(EventSelector::new() .with_type("OrderCreated")) .with_effective_time_range( Utc::now() - Duration::hours(1), Utc::now() ) .with_temporality(QueryTemporality::EffectiveTime) .with_direction(Direction::Reverse) .with_limit(10);Stream Processing Pattern
Section titled “Stream Processing Pattern”// Process all OrderCreated eventslet query = DatabaseQuery::new() .with_selector(EventSelector::new() .with_type("OrderCreated"));
let mut events = app.query_events("my-events", latest_revision, &query);while let Some(event) = events.next().await { let event = event?;
// Deserialize event data let order: OrderCreatedEvent = serde_json::from_value(event.data())?;
// Process order process_order(order).await?;}Audit Trail Query
Section titled “Audit Trail Query”// Get all events related to a customerlet query = DatabaseQuery::new() .with_selector(EventSelector::new() .with_subject_prefix("customer-456"));
let audit_trail: Vec<Event> = app.query_events( "my-events", latest_revision, &query).collect().await?;
for event in audit_trail { println!("{}: {} - {}", event.time(), event.event_type(), event.source());}Performance Optimization
Section titled “Performance Optimization”Use Specific Selectors
Section titled “Use Specific Selectors”// Good: Specific selectorlet query = DatabaseQuery::new() .with_selector(EventSelector::new() .with_stream("order-service") .with_type("OrderCreated"));
// Avoid: Scanning all eventslet query = DatabaseQuery::new(); // No selectorsLimit Result Sets
Section titled “Limit Result Sets”// Process in batcheslet mut continuation = None;loop { let query = DatabaseQuery::new() .with_selector(selector.clone()) .with_limit(100);
if let Some(rev) = continuation { query = query.with_revision_range(RevisionRange::from(rev)); }
let events: Vec<Event> = app.query_events("my-events", latest_revision, &query) .collect() .await?;
if events.is_empty() { break; }
process_batch(&events).await?; continuation = Some(events.last().unwrap().revision() + 1);}Choose Appropriate Revision
Section titled “Choose Appropriate Revision”// Consistent read at specific revisionlet revision = 100;let events = app.query_events("my-events", revision, &query);
// Latest data (may vary between queries)let database = app.latest_database("my-events").await?;let events = app.query_events("my-events", database.revision(), &query);Error Handling
Section titled “Error Handling”Common query errors:
| Error | Description | Resolution |
|---|---|---|
NOT_FOUND | Database doesn’t exist | Verify database name |
INVALID_ARGUMENT | Invalid query parameters | Check selector syntax |
RESOURCE_EXHAUSTED | Result set too large | Add limit or pagination |
DEADLINE_EXCEEDED | Query timeout | Optimize query or increase timeout |
use evidentsource_api::DatabaseQueryError;
match app.query_events("my-events", revision, &query).collect().await { Ok(events) => process_events(events), Err(DatabaseQueryError::DatabaseNotFound(_)) => { eprintln!("Database not found"); } Err(DatabaseQueryError::InvalidQuery(msg)) => { eprintln!("Invalid query: {}", msg); } Err(e) => { eprintln!("Query failed: {}", e); }}Best Practices
Section titled “Best Practices”- Use Consistent Reads: Query at specific revisions for consistency
- Stream Large Results: Don’t collect entire result sets into memory
- Index-Friendly Queries: Use stream, subject, or type filters
- Limit Results: Always set reasonable limits
- Handle Errors: Implement retry logic for transient failures
Next Steps
Section titled “Next Steps”- Learn about Bi-Temporal Indexing for efficient queries
- Implement State Views for materialized queries
- Explore Constraints for consistency
- Build State Changes for command processing