Skip to content

Kafka Streaming API

The Kafka Streaming API publishes database events (database creation, batch transactions, deletions) to Apache Kafka in real-time. Events follow the CloudEvents 1.0 specification for interoperability with standard event streaming tools.

EvidentSource uses four Kafka topics for different purposes. Understanding these topics and their access patterns is essential for integration.

TopicDefault NameAccessPurpose
Transaction Proposalsevidentsource-transaction-proposalsInternalDistributed transaction coordination
Database Eventsevidentsource-database-eventsPublic ReadDatabase lifecycle notifications
Async Commandsevidentsource-async-commandsPublic WriteAsynchronous command submission
Async Responsesevidentsource-async-responsesPublic ReadAsynchronous command results

Transaction Proposals (evidentsource-transaction-proposals)

This topic is used internally by the Kafka linearizer for distributed transaction coordination between EvidentSource cluster nodes.

Warning: Do not consume from or produce to this topic from external applications. Doing so may corrupt transaction ordering and data consistency.

Database Events (evidentsource-database-events)

Subscribe to this topic to receive notifications when:

  • A database is created
  • Events are transacted (batches committed)
  • A database is deleted

This is the primary integration point for building downstream systems. See Event Types below for message formats.

Async Commands (evidentsource-async-commands)

Produce to this topic to submit commands for asynchronous processing. This is useful for:

  • Bulk data ingestion pipelines
  • Fire-and-forget command submission
  • Decoupling producers from EvidentSource latency

Async Responses (evidentsource-async-responses)

Consume from this topic to receive results of asynchronously submitted commands. Messages include:

  • ce_correlationid header linking back to the original command
  • Success or error status
  • Transaction details on success

Topics must be pre-created before deploying EvidentSource. See Kafka Prerequisites for:

  • Topic configuration recommendations
  • Access control setup (IAM, ACLs)
  • AWS MSK integration guide

EvidentSource can publish DatabaseEvent notifications to Kafka whenever:

  • A database is created
  • A batch of events is transacted
  • A database is deleted

This enables building real-time downstream systems like:

  • Materialized views - Update read models in external databases
  • Search indexing - Keep Elasticsearch/Solr in sync
  • Event-driven workflows - Trigger business processes
  • Analytics - Stream events to data warehouses
  • Notifications - Send alerts based on event patterns

All Kafka messages conform to the CloudEvents Kafka Binding specification using binary content mode:

  • Metadata is stored in Kafka message headers (prefixed with ce_)
  • Event data is stored in the message body (Protobuf-encoded)
  • Message key is the database name (ensures ordering per database)

Every message includes these standard CloudEvents headers:

HeaderTypeDescriptionExample
ce_specversionstringCloudEvents version1.0
ce_idstringUnique event ID (UUID)a3e8f7c2-1234-5678-9abc-def012345678
ce_sourceURIEvent source identifierhttps://api.example.com/db/accounts
ce_typestringEvent type identifiercom.evidentsource.batch.transacted.v1
ce_timetimestampEvent timestamp (RFC3339)2024-01-15T10:30:00.123Z
ce_datacontenttypestringPayload content typeapplication/protobuf
ce_subjectstringEvent subject (optional)Batch ID for transacted events

EvidentSource adds custom extension attributes for revision tracking:

HeaderTypePresent OnDescription
ce_revisionuint64DatabaseCreated, BatchTransactedDatabase revision after this event
ce_basisuint64DatabaseCreated, BatchTransactedDatabase revision before this event

These attributes enable consumers to:

  • Detect gaps in the event stream
  • Track database state progression
  • Validate event ordering and continuity
  • Skip deserialization when only metadata is needed

Published when a new database is created.

CloudEvents Headers:

ce_specversion: 1.0
ce_id: 550e8400-e29b-41d4-a716-446655440000
ce_source: https://api.example.com/db/accounts
ce_type: com.evidentsource.database.created.v1
ce_time: 2024-01-15T10:30:00.123Z
ce_datacontenttype: application/protobuf
ce_revision: 0
ce_basis: 0

Message Key:

accounts

Message Body (Protobuf):

message Database {
string name = 1; // "accounts"
google.protobuf.Timestamp created_at = 2;
uint64 basis = 3; // 0
uint64 revision = 4; // 0
google.protobuf.Timestamp revision_timestamp = 5;
}

Published when a batch of events is successfully transacted.

CloudEvents Headers:

ce_specversion: 1.0
ce_id: 7f3e8a1c-9876-5432-1fed-cba987654321
ce_source: https://api.example.com/db/accounts
ce_type: com.evidentsource.batch.transacted.v1
ce_time: 2024-01-15T10:30:05.456Z
ce_datacontenttype: application/protobuf
ce_subject: batch-550e8400-e29b-41d4-a716-446655440000
ce_revision: 150
ce_basis: 145

Message Key:

accounts

Message Body (Protobuf):

message BatchSummary {
string id = 1; // Batch ID (UUID)
string database = 2; // "accounts"
uint64 basis = 3; // Revision before transaction
uint64 revision = 4; // Revision after transaction
uint32 event_count = 5; // Number of events in batch
google.protobuf.Timestamp recorded_time = 6;
}

Note: The ce_subject header contains the batch ID, allowing consumers to filter or route based on specific batches without deserializing the payload.

Published when a database is permanently deleted.

CloudEvents Headers:

ce_specversion: 1.0
ce_id: c4f6b2d8-1111-2222-3333-444455556666
ce_source: https://api.example.com/db/old-database
ce_type: com.evidentsource.database.deleted.v1
ce_time: 2024-01-15T11:00:00.789Z
ce_datacontenttype: application/octet-stream

Message Key:

old-database

Message Body:

(empty)

Note: No ce_revision, ce_basis, or ce_subject headers are present for deletion events. The payload is empty.

By default, all DatabaseEvents are published to:

evidentsource-database-events

Configure via environment variable or parameter:

Terminal window
DATABASE_EVENTS_KAFKA_TOPIC=my-custom-topic

Events are partitioned by database name (message key):

  • All events for a database go to the same partition
  • Guarantees ordering within a database
  • Enables parallel processing across databases

Within a partition (database):

  1. DatabaseCreated is always first
  2. BatchTransacted events follow in revision order
  3. DatabaseDeleted is last (if database is deleted)
from kafka import KafkaConsumer
from google.protobuf.timestamp_pb2 import Timestamp
import database_pb2 # Generated from domain.proto
consumer = KafkaConsumer(
'evidentsource-database-events',
bootstrap_servers=['localhost:9092'],
group_id='my-consumer-group',
enable_auto_commit=True,
auto_offset_reset='earliest'
)
for message in consumer:
# Extract CloudEvents headers
headers = dict(message.headers)
ce_type = headers.get(b'ce_type').decode('utf-8')
ce_source = headers.get(b'ce_source').decode('utf-8')
ce_time = headers.get(b'ce_time').decode('utf-8')
# Get database name from message key
database_name = message.key.decode('utf-8')
if ce_type == 'com.evidentsource.database.created.v1':
db = database_pb2.Database()
db.ParseFromString(message.value)
print(f"Database created: {db.name} at revision {db.revision}")
elif ce_type == 'com.evidentsource.batch.transacted.v1':
# Check revision without deserializing
ce_revision = int(headers.get(b'ce_revision').decode('utf-8'))
ce_basis = int(headers.get(b'ce_basis').decode('utf-8'))
# Deserialize if needed
batch = database_pb2.BatchSummary()
batch.ParseFromString(message.value)
print(f"Batch {batch.id}: {batch.event_count} events, "
f"revision {batch.basis} -> {batch.revision}")
elif ce_type == 'com.evidentsource.database.deleted.v1':
print(f"Database deleted: {database_name}")
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.google.protobuf.InvalidProtocolBufferException;
@Service
public class DatabaseEventConsumer {
@KafkaListener(topics = "evidentsource-database-events", groupId = "my-group")
public void consume(ConsumerRecord<String, byte[]> record) {
String ceType = getHeader(record, "ce_type");
String ceSource = getHeader(record, "ce_source");
String databaseName = record.key();
switch (ceType) {
case "com.evidentsource.database.created.v1":
handleDatabaseCreated(record.value());
break;
case "com.evidentsource.batch.transacted.v1":
long revision = Long.parseLong(getHeader(record, "ce_revision"));
long basis = Long.parseLong(getHeader(record, "ce_basis"));
handleBatchTransacted(record.value(), revision, basis);
break;
case "com.evidentsource.database.deleted.v1":
handleDatabaseDeleted(databaseName);
break;
}
}
private void handleBatchTransacted(byte[] payload, long revision, long basis) {
try {
BatchSummary batch = BatchSummary.parseFrom(payload);
System.out.printf("Batch %s: %d events, revision %d -> %d%n",
batch.getId(), batch.getEventCount(), batch.getBasis(), batch.getRevision());
} catch (InvalidProtocolBufferException e) {
// Handle error
}
}
private String getHeader(ConsumerRecord<?, ?> record, String key) {
Header header = record.headers().lastHeader(key);
return header != null ? new String(header.value()) : null;
}
}
package main
import (
"fmt"
"log"
"github.com/confluentinc/confluent-kafka-go/kafka"
pb "example.com/evidentsource/proto"
"google.golang.org/protobuf/proto"
)
func main() {
consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost:9092",
"group.id": "my-group",
"auto.offset.reset": "earliest",
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
consumer.Subscribe("evidentsource-database-events", nil)
for {
msg, err := consumer.ReadMessage(-1)
if err != nil {
log.Printf("Error: %v", err)
continue
}
ceType := getHeader(msg, "ce_type")
databaseName := string(msg.Key)
switch ceType {
case "com.evidentsource.database.created.v1":
var db pb.Database
if err := proto.Unmarshal(msg.Value, &db); err != nil {
log.Printf("Parse error: %v", err)
continue
}
fmt.Printf("Database created: %s at revision %d\n",
db.Name, db.Revision)
case "com.evidentsource.batch.transacted.v1":
revision := getHeader(msg, "ce_revision")
basis := getHeader(msg, "ce_basis")
var batch pb.BatchSummary
if err := proto.Unmarshal(msg.Value, &batch); err != nil {
log.Printf("Parse error: %v", err)
continue
}
fmt.Printf("Batch %s: %d events, revision %s -> %s\n",
batch.Id, batch.EventCount, basis, revision)
case "com.evidentsource.database.deleted.v1":
fmt.Printf("Database deleted: %s\n", databaseName)
}
}
}
func getHeader(msg *kafka.Message, key string) string {
for _, header := range msg.Headers {
if header.Key == key {
return string(header.Value)
}
}
return ""
}
const { Kafka } = require('kafkajs');
const protobuf = require('protobufjs');
// Load protobuf definitions
const root = protobuf.loadSync('domain.proto');
const Database = root.lookupType('Database');
const BatchSummary = root.lookupType('BatchSummary');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
const consumer = kafka.consumer({ groupId: 'my-group' });
async function run() {
await consumer.connect();
await consumer.subscribe({
topic: 'evidentsource-database-events',
fromBeginning: true
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const headers = {};
Object.entries(message.headers).forEach(([key, value]) => {
headers[key] = value.toString();
});
const ceType = headers.ce_type;
const databaseName = message.key.toString();
switch (ceType) {
case 'com.evidentsource.database.created.v1':
const db = Database.decode(message.value);
console.log(`Database created: ${db.name} at revision ${db.revision}`);
break;
case 'com.evidentsource.batch.transacted.v1':
const revision = parseInt(headers.ce_revision);
const basis = parseInt(headers.ce_basis);
const batch = BatchSummary.decode(message.value);
console.log(`Batch ${batch.id}: ${batch.eventCount} events, ` +
`revision ${batch.basis} -> ${batch.revision}`);
break;
case 'com.evidentsource.database.deleted.v1':
console.log(`Database deleted: ${databaseName}`);
break;
}
},
});
}
run().catch(console.error);

Use ce_revision and ce_basis to detect missing events:

last_revision = 0
for message in consumer:
headers = dict(message.headers)
ce_type = headers.get(b'ce_type').decode('utf-8')
if ce_type != 'com.evidentsource.database.deleted.v1':
ce_basis = int(headers.get(b'ce_basis').decode('utf-8'))
ce_revision = int(headers.get(b'ce_revision').decode('utf-8'))
if ce_basis != last_revision:
print(f"GAP DETECTED: Expected basis {last_revision}, got {ce_basis}")
# Handle gap: rewind, alert, etc.
last_revision = ce_revision

Use Kafka consumer filters or header-based routing:

@KafkaListener(
topics = "evidentsource-database-events",
groupId = "batch-only-consumer",
containerFactory = "filteringContainerFactory"
)
public void consumeBatchesOnly(ConsumerRecord<String, byte[]> record) {
String ceType = getHeader(record, "ce_type");
if ("com.evidentsource.batch.transacted.v1".equals(ceType)) {
// Process batch transactions only
handleBatch(record.value());
}
}

Consume only specific databases using topic partitioning:

// Subscribe to specific partition for a database
// (requires knowing partition mapping)
consumer.Assign([]kafka.TopicPartition{
{Topic: &topic, Partition: accountsPartition},
})

Or filter after consumption:

for message in consumer:
database_name = message.key.decode('utf-8')
if database_name == 'accounts':
# Process accounts database events only
handle_event(message)

Configure Kafka publishing in the Lambda function or server:

Environment Variables:

Terminal window
KAFKA_BROKERS=localhost:9092
DATABASE_EVENTS_KAFKA_TOPIC=evidentsource-database-events
BASE_URI=https://api.example.com

Lambda SAM Template:

Parameters:
KafkaBrokers:
Type: String
Description: Comma-separated Kafka brokers
DatabaseEventsTopic:
Type: String
Default: evidentsource-database-events
BaseUri:
Type: String
Description: Base URI for CloudEvents source attribute
Environment:
Variables:
KAFKA_BROKERS: !Ref KafkaBrokers
DATABASE_EVENTS_KAFKA_TOPIC: !Ref DatabaseEventsTopic
BASE_URI: !Ref BaseUri
  1. Consumer Groups: Use consumer groups for parallel processing

    consumer = KafkaConsumer(
    'evidentsource-database-events',
    group_id='my-consumer-group', # Share load across instances
    enable_auto_commit=True
    )
  2. Error Handling: Use dead letter topics for failed messages

    @KafkaListener(
    topics = "evidentsource-database-events",
    errorHandler = "kafkaErrorHandler"
    )
  3. Idempotency: Handle duplicate deliveries using ce_id

    processed_ids = set()
    ce_id = headers.get(b'ce_id').decode('utf-8')
    if ce_id in processed_ids:
    continue # Skip duplicate
    process_event(message)
    processed_ids.add(ce_id)
  4. Offset Management: Commit offsets only after successful processing

    msg, err := consumer.ReadMessage(-1)
    if err == nil {
    processMessage(msg)
    consumer.CommitMessage(msg) // Commit after success
    }

The event payloads use Protocol Buffers. Download the schema definitions:

Generate code for your language:

Terminal window
# Python
protoc --python_out=. domain.proto
# Java
protoc --java_out=. domain.proto
# Go
protoc --go_out=. domain.proto
# JavaScript/TypeScript
protoc --js_out=import_style=commonjs,binary:. domain.proto
  1. Consumer Lag: Time behind the latest message

    Terminal window
    kafka-consumer-groups --bootstrap-server localhost:9092 \
    --group my-consumer-group --describe
  2. Processing Rate: Messages processed per second

  3. Error Rate: Failed message processing

  4. Gap Detection: Missing revisions detected

Many tools support CloudEvents natively:

  • Knative Eventing: Route events based on CloudEvents attributes
  • Argo Events: Trigger workflows from CloudEvents
  • Kafka Streams: Process with CloudEvents-aware deserializers
  • Flink: Stream processing with CloudEvents support

Problem: Consumer doesn’t see ce_* headers

Solution: Ensure your Kafka client preserves headers:

# Enable headers in consumer
consumer = KafkaConsumer(
'evidentsource-database-events',
bootstrap_servers=['localhost:9092'],
# Headers are enabled by default in recent versions
)

Problem: ParseFromString fails

Solution: Verify you’re using the correct message type for each event type:

  • DatabaseCreatedDatabase
  • BatchTransactedBatchSummary
  • DatabaseDeleted → empty payload

Problem: Events arrive in wrong order

Solution: Ensure consuming from a single partition per database:

  • Events for one database always go to the same partition (by key)
  • Consumer should process partitions sequentially
  • Check for consumer rebalancing causing replay

EvidentSource servers and Lambda functions authenticate to Kafka using environment variables or CloudFormation parameters:

Parameter/VariableDescription
KAFKA_SECURITY_PROTOCOLSecurity protocol: PLAINTEXT, SSL, or SASL_SSL
KAFKA_SASL_MECHANISMSASL mechanism: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, or AWS_MSK_IAM
KAFKA_SASL_USERNAMESASL username (not needed for AWS_MSK_IAM)
KAFKA_SASL_PASSWORDSASL password (not needed for AWS_MSK_IAM)

AWS MSK with IAM Authentication (Recommended)

Terminal window
KAFKA_SECURITY_PROTOCOL=SASL_SSL
KAFKA_SASL_MECHANISM=AWS_MSK_IAM
# No username/password needed - uses IAM role

SCRAM Authentication

Terminal window
KAFKA_SECURITY_PROTOCOL=SASL_SSL
KAFKA_SASL_MECHANISM=SCRAM-SHA-512
KAFKA_SASL_USERNAME=evidentsource
KAFKA_SASL_PASSWORD=your-secret-password

See Kafka Prerequisites for complete server-side security setup.

Configure encrypted Kafka connections for consumers:

consumer = KafkaConsumer(
'evidentsource-database-events',
bootstrap_servers=['broker:9093'],
security_protocol='SSL',
ssl_cafile='/path/to/ca-cert',
ssl_certfile='/path/to/client-cert',
ssl_keyfile='/path/to/client-key'
)
consumer = KafkaConsumer(
'evidentsource-database-events',
bootstrap_servers=['broker:9093'],
security_protocol='SASL_SSL',
sasl_mechanism='SCRAM-SHA-512',
sasl_plain_username='username',
sasl_plain_password='password'
)

For AWS MSK clusters with IAM authentication:

from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
# Create token provider
tp = MSKAuthTokenProvider(region='us-east-1')
consumer = KafkaConsumer(
'evidentsource-database-events',
bootstrap_servers=['broker:9098'],
security_protocol='SASL_SSL',
sasl_mechanism='OAUTHBEARER',
sasl_oauth_token_provider=tp
)