Skip to content

Querying Events

EvidentSource provides powerful query capabilities for accessing events with flexible filtering, ordering, and streaming support.

The query API supports:

  • Filtering by stream, subject, and event type
  • Transaction-time and effective-time ordering
  • Forward and reverse iteration
  • Streaming large result sets
  • Consistent reads at specific revisions
rpc QueryEvents(EventQueryRequest) returns (stream EventQueryReply) {}
message EventQueryRequest {
string database_name = 1;
uint64 revision = 2;
bool include_event_detail = 3;
DatabaseQuery query = 4;
}
GET /api/v1/db/{database}/{revision}/events?stream=order-service&type=OrderCreated&limit=100
use evidentsource_domain::query::{DatabaseQuery, EventSelector};
use tokio_stream::StreamExt;
let query = DatabaseQuery::new()
.with_selector(EventSelector::new()
.with_stream("order-service")
.with_type("OrderCreated"))
.with_limit(100);
let mut events = app.query_events("my-events", 100, &query);
while let Some(event) = events.next().await {
let event = event?;
println!("Event: {} at revision {}", event.id(), event.revision());
}

Filter events by multiple criteria:

// By stream (source)
EventSelector::new().with_stream("order-service")
// By subject (aggregate)
EventSelector::new().with_subject("order-123")
// By event type
EventSelector::new().with_type("OrderCreated")
// Combine criteria (AND semantics)
EventSelector::new()
.with_stream("order-service")
.with_type("OrderCreated")
.with_subject("order-123")

Use multiple selectors for OR semantics:

let query = DatabaseQuery::new()
.with_selector(EventSelector::new()
.with_type("OrderCreated"))
.with_selector(EventSelector::new()
.with_type("OrderUpdated"))
.with_selector(EventSelector::new()
.with_type("OrderCancelled"));

Query specific revision ranges:

use evidentsource_domain::query::RevisionRange;
// Events from revision 10 to 20
let query = DatabaseQuery::new()
.with_revision_range(RevisionRange::new(10, 20));
// Events after revision 50
let query = DatabaseQuery::new()
.with_revision_range(RevisionRange::from(50));
// Events before revision 100
let query = DatabaseQuery::new()
.with_revision_range(RevisionRange::to(100));

Query by effective time (when events occurred):

use chrono::{Utc, Duration};
let start = Utc::now() - Duration::hours(24);
let end = Utc::now();
let query = DatabaseQuery::new()
.with_effective_time_range(start, end)
.with_temporality(QueryTemporality::EffectiveTime);

Default ordering by when events were stored:

let query = DatabaseQuery::new()
.with_temporality(QueryTemporality::TransactionTime)
.with_direction(Direction::Forward);

Order by when events actually occurred:

let query = DatabaseQuery::new()
.with_temporality(QueryTemporality::EffectiveTime)
.with_direction(Direction::Reverse);
  • Forward: Oldest to newest (ascending)
  • Reverse: Newest to oldest (descending)
let query = DatabaseQuery::new()
.with_limit(100); // Return at most 100 events
// First page
let query = DatabaseQuery::new()
.with_limit(20);
let mut last_revision = 0;
let mut events = app.query_events("my-events", 100, &query);
while let Some(event) = events.next().await {
let event = event?;
last_revision = event.revision();
// Process event
}
// Next page
let query = DatabaseQuery::new()
.with_revision_range(RevisionRange::from(last_revision + 1))
.with_limit(20);
let event = app.fetch_event_by_id(
"my-events",
100, // Query at revision 100
"order-service", // Stream name
"evt-123-456" // Event ID
).await?;
println!("Found event: {} of type {}", event.id(), event.event_type());

Fetch multiple events by their revision numbers:

let revisions = vec![10, 20, 30, 40, 50];
let events = app.fetch_events_by_revisions(
"my-events",
&revisions
).await?;
for event in events {
println!("Event at revision {}: {}", event.revision(), event.id());
}

Iterate through all events in revision order:

rpc ScanDatabaseLog(LogScanRequest) returns (stream DatabaseLogReply) {}
message LogScanRequest {
string database_name = 1;
uint64 start_at_revision = 2;
bool include_event_detail = 3;
}
use tokio_stream::StreamExt;
let mut log = app.scan_database_log("my-events", 1);
while let Some(batch) = log.next().await {
let batch = batch?;
println!("Batch at revision {}: {} events",
batch.revision(), batch.event_count());
}

List unique values in indexes:

let mut streams = app.list_streams("my-events", 100);
while let Some(stream) = streams.next().await {
let stream = stream?;
println!("Stream: {}", stream);
}
let mut subjects = app.list_subjects("my-events", 100);
while let Some(subject) = subjects.next().await {
let subject = subject?;
println!("Subject: {}", subject);
}
let mut types = app.list_event_types("my-events", 100);
while let Some(event_type) = types.next().await {
let event_type = event_type?;
println!("Event Type: {}", event_type);
}
let query = DatabaseQuery::new()
.with_selector(EventSelector::new()
.with_subject("order-123"));
let events: Vec<Event> = app.query_events("my-events", latest_revision, &query)
.collect()
.await?;
use chrono::Duration;
let query = DatabaseQuery::new()
.with_selector(EventSelector::new()
.with_type("OrderCreated"))
.with_effective_time_range(
Utc::now() - Duration::hours(1),
Utc::now()
)
.with_temporality(QueryTemporality::EffectiveTime)
.with_direction(Direction::Reverse)
.with_limit(10);
// Process all OrderCreated events
let query = DatabaseQuery::new()
.with_selector(EventSelector::new()
.with_type("OrderCreated"));
let mut events = app.query_events("my-events", latest_revision, &query);
while let Some(event) = events.next().await {
let event = event?;
// Deserialize event data
let order: OrderCreatedEvent = serde_json::from_value(event.data())?;
// Process order
process_order(order).await?;
}
// Get all events related to a customer
let query = DatabaseQuery::new()
.with_selector(EventSelector::new()
.with_subject_prefix("customer-456"));
let audit_trail: Vec<Event> = app.query_events(
"my-events",
latest_revision,
&query
).collect().await?;
for event in audit_trail {
println!("{}: {} - {}",
event.time(),
event.event_type(),
event.source());
}
// Good: Specific selector
let query = DatabaseQuery::new()
.with_selector(EventSelector::new()
.with_stream("order-service")
.with_type("OrderCreated"));
// Avoid: Scanning all events
let query = DatabaseQuery::new(); // No selectors
// Process in batches
let mut continuation = None;
loop {
let query = DatabaseQuery::new()
.with_selector(selector.clone())
.with_limit(100);
if let Some(rev) = continuation {
query = query.with_revision_range(RevisionRange::from(rev));
}
let events: Vec<Event> = app.query_events("my-events", latest_revision, &query)
.collect()
.await?;
if events.is_empty() {
break;
}
process_batch(&events).await?;
continuation = Some(events.last().unwrap().revision() + 1);
}
// Consistent read at specific revision
let revision = 100;
let events = app.query_events("my-events", revision, &query);
// Latest data (may vary between queries)
let database = app.latest_database("my-events").await?;
let events = app.query_events("my-events", database.revision(), &query);

Common query errors:

ErrorDescriptionResolution
NOT_FOUNDDatabase doesn’t existVerify database name
INVALID_ARGUMENTInvalid query parametersCheck selector syntax
RESOURCE_EXHAUSTEDResult set too largeAdd limit or pagination
DEADLINE_EXCEEDEDQuery timeoutOptimize query or increase timeout
use evidentsource_api::DatabaseQueryError;
match app.query_events("my-events", revision, &query).collect().await {
Ok(events) => process_events(events),
Err(DatabaseQueryError::DatabaseNotFound(_)) => {
eprintln!("Database not found");
}
Err(DatabaseQueryError::InvalidQuery(msg)) => {
eprintln!("Invalid query: {}", msg);
}
Err(e) => {
eprintln!("Query failed: {}", e);
}
}
  1. Use Consistent Reads: Query at specific revisions for consistency
  2. Stream Large Results: Don’t collect entire result sets into memory
  3. Index-Friendly Queries: Use stream, subject, or type filters
  4. Limit Results: Always set reasonable limits
  5. Handle Errors: Implement retry logic for transient failures