Kafka Streaming API
The Kafka Streaming API publishes database events (database creation, batch transactions, deletions) to Apache Kafka in real-time. Events follow the CloudEvents 1.0 specification for interoperability with standard event streaming tools.
Topic Architecture
Section titled “Topic Architecture”EvidentSource uses four Kafka topics for different purposes. Understanding these topics and their access patterns is essential for integration.
Topic Summary
Section titled “Topic Summary”| Topic | Default Name | Access | Purpose |
|---|---|---|---|
| Transaction Proposals | evidentsource-transaction-proposals | Internal | Distributed transaction coordination |
| Database Events | evidentsource-database-events | Public Read | Database lifecycle notifications |
| Async Commands | evidentsource-async-commands | Public Write | Asynchronous command submission |
| Async Responses | evidentsource-async-responses | Public Read | Asynchronous command results |
Internal Topics
Section titled “Internal Topics”Transaction Proposals (evidentsource-transaction-proposals)
This topic is used internally by the Kafka linearizer for distributed transaction coordination between EvidentSource cluster nodes.
Warning: Do not consume from or produce to this topic from external applications. Doing so may corrupt transaction ordering and data consistency.
Public Topics
Section titled “Public Topics”Database Events (evidentsource-database-events)
Subscribe to this topic to receive notifications when:
- A database is created
- Events are transacted (batches committed)
- A database is deleted
This is the primary integration point for building downstream systems. See Event Types below for message formats.
Async Commands (evidentsource-async-commands)
Produce to this topic to submit commands for asynchronous processing. This is useful for:
- Bulk data ingestion pipelines
- Fire-and-forget command submission
- Decoupling producers from EvidentSource latency
Async Responses (evidentsource-async-responses)
Consume from this topic to receive results of asynchronously submitted commands. Messages include:
ce_correlationidheader linking back to the original command- Success or error status
- Transaction details on success
Topic Provisioning
Section titled “Topic Provisioning”Topics must be pre-created before deploying EvidentSource. See Kafka Prerequisites for:
- Topic configuration recommendations
- Access control setup (IAM, ACLs)
- AWS MSK integration guide
Overview
Section titled “Overview”EvidentSource can publish DatabaseEvent notifications to Kafka whenever:
- A database is created
- A batch of events is transacted
- A database is deleted
This enables building real-time downstream systems like:
- Materialized views - Update read models in external databases
- Search indexing - Keep Elasticsearch/Solr in sync
- Event-driven workflows - Trigger business processes
- Analytics - Stream events to data warehouses
- Notifications - Send alerts based on event patterns
CloudEvents Format
Section titled “CloudEvents Format”All Kafka messages conform to the CloudEvents Kafka Binding specification using binary content mode:
- Metadata is stored in Kafka message headers (prefixed with
ce_) - Event data is stored in the message body (Protobuf-encoded)
- Message key is the database name (ensures ordering per database)
CloudEvents Headers
Section titled “CloudEvents Headers”Every message includes these standard CloudEvents headers:
| Header | Type | Description | Example |
|---|---|---|---|
ce_specversion | string | CloudEvents version | 1.0 |
ce_id | string | Unique event ID (UUID) | a3e8f7c2-1234-5678-9abc-def012345678 |
ce_source | URI | Event source identifier | https://api.example.com/db/accounts |
ce_type | string | Event type identifier | com.evidentsource.batch.transacted.v1 |
ce_time | timestamp | Event timestamp (RFC3339) | 2024-01-15T10:30:00.123Z |
ce_datacontenttype | string | Payload content type | application/protobuf |
ce_subject | string | Event subject (optional) | Batch ID for transacted events |
Extension Attributes
Section titled “Extension Attributes”EvidentSource adds custom extension attributes for revision tracking:
| Header | Type | Present On | Description |
|---|---|---|---|
ce_revision | uint64 | DatabaseCreated, BatchTransacted | Database revision after this event |
ce_basis | uint64 | DatabaseCreated, BatchTransacted | Database revision before this event |
These attributes enable consumers to:
- Detect gaps in the event stream
- Track database state progression
- Validate event ordering and continuity
- Skip deserialization when only metadata is needed
Event Types
Section titled “Event Types”DatabaseCreated
Section titled “DatabaseCreated”Published when a new database is created.
CloudEvents Headers:
ce_specversion: 1.0ce_id: 550e8400-e29b-41d4-a716-446655440000ce_source: https://api.example.com/db/accountsce_type: com.evidentsource.database.created.v1ce_time: 2024-01-15T10:30:00.123Zce_datacontenttype: application/protobufce_revision: 0ce_basis: 0Message Key:
accountsMessage Body (Protobuf):
message Database { string name = 1; // "accounts" google.protobuf.Timestamp created_at = 2; uint64 basis = 3; // 0 uint64 revision = 4; // 0 google.protobuf.Timestamp revision_timestamp = 5;}BatchTransacted
Section titled “BatchTransacted”Published when a batch of events is successfully transacted.
CloudEvents Headers:
ce_specversion: 1.0ce_id: 7f3e8a1c-9876-5432-1fed-cba987654321ce_source: https://api.example.com/db/accountsce_type: com.evidentsource.batch.transacted.v1ce_time: 2024-01-15T10:30:05.456Zce_datacontenttype: application/protobufce_subject: batch-550e8400-e29b-41d4-a716-446655440000ce_revision: 150ce_basis: 145Message Key:
accountsMessage Body (Protobuf):
message BatchSummary { string id = 1; // Batch ID (UUID) string database = 2; // "accounts" uint64 basis = 3; // Revision before transaction uint64 revision = 4; // Revision after transaction uint32 event_count = 5; // Number of events in batch google.protobuf.Timestamp recorded_time = 6;}Note: The ce_subject header contains the batch ID, allowing consumers to filter or route based on specific batches without deserializing the payload.
DatabaseDeleted
Section titled “DatabaseDeleted”Published when a database is permanently deleted.
CloudEvents Headers:
ce_specversion: 1.0ce_id: c4f6b2d8-1111-2222-3333-444455556666ce_source: https://api.example.com/db/old-databasece_type: com.evidentsource.database.deleted.v1ce_time: 2024-01-15T11:00:00.789Zce_datacontenttype: application/octet-streamMessage Key:
old-databaseMessage Body:
(empty)Note: No ce_revision, ce_basis, or ce_subject headers are present for deletion events. The payload is empty.
Topic Configuration
Section titled “Topic Configuration”Topic Name
Section titled “Topic Name”By default, all DatabaseEvents are published to:
evidentsource-database-eventsConfigure via environment variable or parameter:
DATABASE_EVENTS_KAFKA_TOPIC=my-custom-topicPartitioning Strategy
Section titled “Partitioning Strategy”Events are partitioned by database name (message key):
- All events for a database go to the same partition
- Guarantees ordering within a database
- Enables parallel processing across databases
Message Ordering
Section titled “Message Ordering”Within a partition (database):
DatabaseCreatedis always firstBatchTransactedevents follow in revision orderDatabaseDeletedis last (if database is deleted)
Consumer Examples
Section titled “Consumer Examples”Python (kafka-python)
Section titled “Python (kafka-python)”from kafka import KafkaConsumerfrom google.protobuf.timestamp_pb2 import Timestampimport database_pb2 # Generated from domain.proto
consumer = KafkaConsumer( 'evidentsource-database-events', bootstrap_servers=['localhost:9092'], group_id='my-consumer-group', enable_auto_commit=True, auto_offset_reset='earliest')
for message in consumer: # Extract CloudEvents headers headers = dict(message.headers) ce_type = headers.get(b'ce_type').decode('utf-8') ce_source = headers.get(b'ce_source').decode('utf-8') ce_time = headers.get(b'ce_time').decode('utf-8')
# Get database name from message key database_name = message.key.decode('utf-8')
if ce_type == 'com.evidentsource.database.created.v1': db = database_pb2.Database() db.ParseFromString(message.value) print(f"Database created: {db.name} at revision {db.revision}")
elif ce_type == 'com.evidentsource.batch.transacted.v1': # Check revision without deserializing ce_revision = int(headers.get(b'ce_revision').decode('utf-8')) ce_basis = int(headers.get(b'ce_basis').decode('utf-8'))
# Deserialize if needed batch = database_pb2.BatchSummary() batch.ParseFromString(message.value) print(f"Batch {batch.id}: {batch.event_count} events, " f"revision {batch.basis} -> {batch.revision}")
elif ce_type == 'com.evidentsource.database.deleted.v1': print(f"Database deleted: {database_name}")Java (Spring Kafka)
Section titled “Java (Spring Kafka)”import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.common.header.Header;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Service;import com.google.protobuf.InvalidProtocolBufferException;
@Servicepublic class DatabaseEventConsumer {
@KafkaListener(topics = "evidentsource-database-events", groupId = "my-group") public void consume(ConsumerRecord<String, byte[]> record) { String ceType = getHeader(record, "ce_type"); String ceSource = getHeader(record, "ce_source"); String databaseName = record.key();
switch (ceType) { case "com.evidentsource.database.created.v1": handleDatabaseCreated(record.value()); break;
case "com.evidentsource.batch.transacted.v1": long revision = Long.parseLong(getHeader(record, "ce_revision")); long basis = Long.parseLong(getHeader(record, "ce_basis")); handleBatchTransacted(record.value(), revision, basis); break;
case "com.evidentsource.database.deleted.v1": handleDatabaseDeleted(databaseName); break; } }
private void handleBatchTransacted(byte[] payload, long revision, long basis) { try { BatchSummary batch = BatchSummary.parseFrom(payload); System.out.printf("Batch %s: %d events, revision %d -> %d%n", batch.getId(), batch.getEventCount(), batch.getBasis(), batch.getRevision()); } catch (InvalidProtocolBufferException e) { // Handle error } }
private String getHeader(ConsumerRecord<?, ?> record, String key) { Header header = record.headers().lastHeader(key); return header != null ? new String(header.value()) : null; }}Go (confluent-kafka-go)
Section titled “Go (confluent-kafka-go)”package main
import ( "fmt" "log" "github.com/confluentinc/confluent-kafka-go/kafka" pb "example.com/evidentsource/proto" "google.golang.org/protobuf/proto")
func main() { consumer, err := kafka.NewConsumer(&kafka.ConfigMap{ "bootstrap.servers": "localhost:9092", "group.id": "my-group", "auto.offset.reset": "earliest", }) if err != nil { log.Fatal(err) } defer consumer.Close()
consumer.Subscribe("evidentsource-database-events", nil)
for { msg, err := consumer.ReadMessage(-1) if err != nil { log.Printf("Error: %v", err) continue }
ceType := getHeader(msg, "ce_type") databaseName := string(msg.Key)
switch ceType { case "com.evidentsource.database.created.v1": var db pb.Database if err := proto.Unmarshal(msg.Value, &db); err != nil { log.Printf("Parse error: %v", err) continue } fmt.Printf("Database created: %s at revision %d\n", db.Name, db.Revision)
case "com.evidentsource.batch.transacted.v1": revision := getHeader(msg, "ce_revision") basis := getHeader(msg, "ce_basis")
var batch pb.BatchSummary if err := proto.Unmarshal(msg.Value, &batch); err != nil { log.Printf("Parse error: %v", err) continue } fmt.Printf("Batch %s: %d events, revision %s -> %s\n", batch.Id, batch.EventCount, basis, revision)
case "com.evidentsource.database.deleted.v1": fmt.Printf("Database deleted: %s\n", databaseName) } }}
func getHeader(msg *kafka.Message, key string) string { for _, header := range msg.Headers { if header.Key == key { return string(header.Value) } } return ""}Node.js (KafkaJS)
Section titled “Node.js (KafkaJS)”const { Kafka } = require('kafkajs');const protobuf = require('protobufjs');
// Load protobuf definitionsconst root = protobuf.loadSync('domain.proto');const Database = root.lookupType('Database');const BatchSummary = root.lookupType('BatchSummary');
const kafka = new Kafka({ clientId: 'my-app', brokers: ['localhost:9092']});
const consumer = kafka.consumer({ groupId: 'my-group' });
async function run() { await consumer.connect(); await consumer.subscribe({ topic: 'evidentsource-database-events', fromBeginning: true });
await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const headers = {}; Object.entries(message.headers).forEach(([key, value]) => { headers[key] = value.toString(); });
const ceType = headers.ce_type; const databaseName = message.key.toString();
switch (ceType) { case 'com.evidentsource.database.created.v1': const db = Database.decode(message.value); console.log(`Database created: ${db.name} at revision ${db.revision}`); break;
case 'com.evidentsource.batch.transacted.v1': const revision = parseInt(headers.ce_revision); const basis = parseInt(headers.ce_basis); const batch = BatchSummary.decode(message.value); console.log(`Batch ${batch.id}: ${batch.eventCount} events, ` + `revision ${batch.basis} -> ${batch.revision}`); break;
case 'com.evidentsource.database.deleted.v1': console.log(`Database deleted: ${databaseName}`); break; } }, });}
run().catch(console.error);Consumer Patterns
Section titled “Consumer Patterns”Gap Detection
Section titled “Gap Detection”Use ce_revision and ce_basis to detect missing events:
last_revision = 0
for message in consumer: headers = dict(message.headers) ce_type = headers.get(b'ce_type').decode('utf-8')
if ce_type != 'com.evidentsource.database.deleted.v1': ce_basis = int(headers.get(b'ce_basis').decode('utf-8')) ce_revision = int(headers.get(b'ce_revision').decode('utf-8'))
if ce_basis != last_revision: print(f"GAP DETECTED: Expected basis {last_revision}, got {ce_basis}") # Handle gap: rewind, alert, etc.
last_revision = ce_revisionFiltering by Event Type
Section titled “Filtering by Event Type”Use Kafka consumer filters or header-based routing:
@KafkaListener( topics = "evidentsource-database-events", groupId = "batch-only-consumer", containerFactory = "filteringContainerFactory")public void consumeBatchesOnly(ConsumerRecord<String, byte[]> record) { String ceType = getHeader(record, "ce_type"); if ("com.evidentsource.batch.transacted.v1".equals(ceType)) { // Process batch transactions only handleBatch(record.value()); }}Database-Specific Consumers
Section titled “Database-Specific Consumers”Consume only specific databases using topic partitioning:
// Subscribe to specific partition for a database// (requires knowing partition mapping)consumer.Assign([]kafka.TopicPartition{ {Topic: &topic, Partition: accountsPartition},})Or filter after consumption:
for message in consumer: database_name = message.key.decode('utf-8') if database_name == 'accounts': # Process accounts database events only handle_event(message)Configuration
Section titled “Configuration”Server-Side
Section titled “Server-Side”Configure Kafka publishing in the Lambda function or server:
Environment Variables:
KAFKA_BROKERS=localhost:9092DATABASE_EVENTS_KAFKA_TOPIC=evidentsource-database-eventsBASE_URI=https://api.example.comLambda SAM Template:
Parameters: KafkaBrokers: Type: String Description: Comma-separated Kafka brokers DatabaseEventsTopic: Type: String Default: evidentsource-database-events BaseUri: Type: String Description: Base URI for CloudEvents source attribute
Environment: Variables: KAFKA_BROKERS: !Ref KafkaBrokers DATABASE_EVENTS_KAFKA_TOPIC: !Ref DatabaseEventsTopic BASE_URI: !Ref BaseUriConsumer Best Practices
Section titled “Consumer Best Practices”-
Consumer Groups: Use consumer groups for parallel processing
consumer = KafkaConsumer('evidentsource-database-events',group_id='my-consumer-group', # Share load across instancesenable_auto_commit=True) -
Error Handling: Use dead letter topics for failed messages
@KafkaListener(topics = "evidentsource-database-events",errorHandler = "kafkaErrorHandler") -
Idempotency: Handle duplicate deliveries using
ce_idprocessed_ids = set()ce_id = headers.get(b'ce_id').decode('utf-8')if ce_id in processed_ids:continue # Skip duplicateprocess_event(message)processed_ids.add(ce_id) -
Offset Management: Commit offsets only after successful processing
msg, err := consumer.ReadMessage(-1)if err == nil {processMessage(msg)consumer.CommitMessage(msg) // Commit after success}
Protobuf Schemas
Section titled “Protobuf Schemas”The event payloads use Protocol Buffers. Download the schema definitions:
- domain.proto - Database and BatchSummary messages
- cloudevents.proto - CloudEvents core
Generate code for your language:
# Pythonprotoc --python_out=. domain.proto
# Javaprotoc --java_out=. domain.proto
# Goprotoc --go_out=. domain.proto
# JavaScript/TypeScriptprotoc --js_out=import_style=commonjs,binary:. domain.protoMonitoring
Section titled “Monitoring”Key Metrics to Track
Section titled “Key Metrics to Track”-
Consumer Lag: Time behind the latest message
Terminal window kafka-consumer-groups --bootstrap-server localhost:9092 \--group my-consumer-group --describe -
Processing Rate: Messages processed per second
-
Error Rate: Failed message processing
-
Gap Detection: Missing revisions detected
CloudEvents-Aware Tools
Section titled “CloudEvents-Aware Tools”Many tools support CloudEvents natively:
- Knative Eventing: Route events based on CloudEvents attributes
- Argo Events: Trigger workflows from CloudEvents
- Kafka Streams: Process with CloudEvents-aware deserializers
- Flink: Stream processing with CloudEvents support
Troubleshooting
Section titled “Troubleshooting”Issue: Missing Headers
Section titled “Issue: Missing Headers”Problem: Consumer doesn’t see ce_* headers
Solution: Ensure your Kafka client preserves headers:
# Enable headers in consumerconsumer = KafkaConsumer( 'evidentsource-database-events', bootstrap_servers=['localhost:9092'], # Headers are enabled by default in recent versions)Issue: Protobuf Parse Errors
Section titled “Issue: Protobuf Parse Errors”Problem: ParseFromString fails
Solution: Verify you’re using the correct message type for each event type:
DatabaseCreated→DatabaseBatchTransacted→BatchSummaryDatabaseDeleted→ empty payload
Issue: Out-of-Order Events
Section titled “Issue: Out-of-Order Events”Problem: Events arrive in wrong order
Solution: Ensure consuming from a single partition per database:
- Events for one database always go to the same partition (by key)
- Consumer should process partitions sequentially
- Check for consumer rebalancing causing replay
Security
Section titled “Security”Server-Side Configuration
Section titled “Server-Side Configuration”EvidentSource servers and Lambda functions authenticate to Kafka using environment variables or CloudFormation parameters:
| Parameter/Variable | Description |
|---|---|
KAFKA_SECURITY_PROTOCOL | Security protocol: PLAINTEXT, SSL, or SASL_SSL |
KAFKA_SASL_MECHANISM | SASL mechanism: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, or AWS_MSK_IAM |
KAFKA_SASL_USERNAME | SASL username (not needed for AWS_MSK_IAM) |
KAFKA_SASL_PASSWORD | SASL password (not needed for AWS_MSK_IAM) |
AWS MSK with IAM Authentication (Recommended)
KAFKA_SECURITY_PROTOCOL=SASL_SSLKAFKA_SASL_MECHANISM=AWS_MSK_IAM# No username/password needed - uses IAM roleSCRAM Authentication
KAFKA_SECURITY_PROTOCOL=SASL_SSLKAFKA_SASL_MECHANISM=SCRAM-SHA-512KAFKA_SASL_USERNAME=evidentsourceKAFKA_SASL_PASSWORD=your-secret-passwordSee Kafka Prerequisites for complete server-side security setup.
Client-Side TLS/SSL
Section titled “Client-Side TLS/SSL”Configure encrypted Kafka connections for consumers:
consumer = KafkaConsumer( 'evidentsource-database-events', bootstrap_servers=['broker:9093'], security_protocol='SSL', ssl_cafile='/path/to/ca-cert', ssl_certfile='/path/to/client-cert', ssl_keyfile='/path/to/client-key')Client-Side SASL Authentication
Section titled “Client-Side SASL Authentication”consumer = KafkaConsumer( 'evidentsource-database-events', bootstrap_servers=['broker:9093'], security_protocol='SASL_SSL', sasl_mechanism='SCRAM-SHA-512', sasl_plain_username='username', sasl_plain_password='password')Client-Side AWS MSK IAM Authentication
Section titled “Client-Side AWS MSK IAM Authentication”For AWS MSK clusters with IAM authentication:
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
# Create token providertp = MSKAuthTokenProvider(region='us-east-1')
consumer = KafkaConsumer( 'evidentsource-database-events', bootstrap_servers=['broker:9098'], security_protocol='SASL_SSL', sasl_mechanism='OAUTHBEARER', sasl_oauth_token_provider=tp)Next Steps
Section titled “Next Steps”- Explore gRPC API for writing events
- See REST API for HTTP access
- Learn about Client Libraries
- Read CloudEvents Concepts for background
- Check Deployment Guide for production setup