Change Data Capture
Change data capture (CDC) turns writes in SlateDB into a durable change stream that external systems can consume. SlateDB exposes CDC through the WalReader, which reads write-ahead log (WAL) SST files from object storage and returns the same row entries that were written by the database.
This page explains how to build a CDC pipeline with WalReader, including ordering, durability, deletes, and resume semantics.
When to use WAL-based CDC
Section titled “When to use WAL-based CDC”Use WAL-based CDC when you need:
- A low-latency stream of inserts, updates, and deletes.
- A simple, append-only feed that can be replayed.
- Integration with downstream systems like Kafka, Pulsar, or data lakes.
WAL-based CDC is not a bootstrap or backfill API. If you need an initial full copy of data, combine a backfill with WAL tailing as described below.
What WalReader reads
Section titled “What WalReader reads”SlateDB stores WAL SSTs under the wal/ directory in the object store. Each WAL file has a monotonically increasing ID and is written in insertion order by sequence number. WalReader lists these files in ID order and exposes a WalFile for each file in object storage.
Each WalFile exposes an iterator() that yields RowEntry values in order. The iterator is optional because the underlying object might have been deleted between listing and reading (for example, due to GC).
Each row returned by the iterator is a RowEntry with:
key: the key bytesvalue: aValueDeletablevariant (Value,Merge, orTombstone)seq: a globally increasing sequence numbercreate_tsandexpire_ts: optional timestamps
Deletes appear as Tombstones. If your database uses merge operators, you will see Merge values and must apply the merge logic yourself if your sink requires materialized values.
Visibility and durability
Section titled “Visibility and durability”WalReader only sees WAL SSTs that have been flushed to object storage. WAL flushes occur when:
- The WAL buffer reaches its size threshold (
l0_sst_size_bytesis used to limit WAL SST file sizes as well). - The configured
flush_intervalelapses. - You explicitly call a WAL or MemTable flush with
Db::flush_with_options.
Note that a memtable flush also forces any pending WAL data to be flushed first, which guarantees the WAL is durable before L0 data is written.
CDC architecture
Section titled “CDC architecture”flowchart LR
A[Writer] --> B[WAL buffer]
B --> C[WAL SSTs in object store]
C --> D[WalReader]
D --> E[CDC sink]
WalReader does not interpret or compact data. It simply provides a durable, ordered feed of row-level changes.
Basic tailer loop
Section titled “Basic tailer loop”This example tails all WAL files, emits row entries, and records a cursor so the stream can resume after restarts.
use slatedb::config::{FlushOptions, FlushType};use slatedb::object_store::memory::InMemory;use slatedb::{Db, RowEntry, ValueDeletable, WalFile, WalReader};use std::sync::Arc;
#[derive(Debug, Default)]struct CdcCursor { wal_id: u64, last_seq: u64,}
#[tokio::main]async fn main() -> anyhow::Result<()> { let object_store = Arc::new(InMemory::new()); let path = "/change-data-capture-example";
let db = Db::open(path, object_store.clone()).await?; db.put(b"user:1", b"alice").await?; db.put(b"user:2", b"bob").await?; db.delete(b"user:2").await?; db.flush_with_options(FlushOptions { flush_type: FlushType::Wal, }) .await?;
let wal_reader = WalReader::new(path, object_store); let mut cursor = CdcCursor::default();
// Use list() once for discovery (or after a long outage). for wal_file in wal_reader.list(cursor.wal_id..).await? { emit_wal_file(&wal_file, &mut cursor).await?; }
// Poll by ID to avoid repeated full prefix listings. let next_file = wal_reader.get(cursor.wal_id + 1); emit_wal_file(&next_file, &mut cursor).await?; println!("Persist cursor periodically: {:?}", cursor);
db.close().await?; Ok(())}
async fn emit_wal_file(wal_file: &WalFile, cursor: &mut CdcCursor) -> anyhow::Result<()> { let mut iter = wal_file.iterator().await?; while let Some(row) = iter.next().await? { if wal_file.id == cursor.wal_id && row.seq <= cursor.last_seq { continue; }
emit_row(wal_file.id, &row); cursor.wal_id = wal_file.id; cursor.last_seq = row.seq; }
Ok(())}
fn emit_row(wal_id: u64, row: &RowEntry) { let key = String::from_utf8_lossy(row.key.as_ref()); match &row.value { ValueDeletable::Value(value) => { let value = String::from_utf8_lossy(value.as_ref()); println!("wal_id={wal_id} seq={} upsert {key}={value}", row.seq); } ValueDeletable::Merge(value) => { let value = String::from_utf8_lossy(value.as_ref()); println!("wal_id={wal_id} seq={} merge {key}+={value}", row.seq); } ValueDeletable::Tombstone => { println!("wal_id={wal_id} seq={} delete {key}", row.seq); } }}This pattern is safe and idempotent as long as you persist the cursor after each emitted row and skip any rows with seq <= last_seq when resuming.
Backfills plus streaming
Section titled “Backfills plus streaming”If you need a full backfill of data not in the WAL along with the WAL change stream:
- Tail the WAL, buffering changes.
- Checkpoint the database using
Admin::create_detached_checkpoint. - Range scan (
Db::scan) over the cloned database to read old data. - Apply the buffered WAL changes in sequence order, filtering rows whose sequence number was in the range scan in (3).
- Delete the checkpoint using
Admin::delete_checkpoint. - Continue tailing WALs from the stored cursor.
This avoids gaps between the backfill and the live stream. Your sink should be idempotent if the backfill and WAL stream can overlap.
Resumability and ordering
Section titled “Resumability and ordering”WalReader lists files in wal ID order, and each WAL file stores entries in increasing sequence order. This gives you a total order that is stable across restarts. A minimal cursor includes:
wal_id: the WAL file currently being processedlast_seq: the highest sequence number processed in that file
If you need to fan out to multiple workers, partition by key and keep one cursor per partition. Ensure your sink can handle replays or duplicates. Since WAL files can be deleted between list and iterator calls, handle None by skipping that file or re-listing from a newer cursor.
Listing costs and polling strategy
Section titled “Listing costs and polling strategy”The list() API can become expensive when WAL retention is high or GC is not keeping up. If the GC is not running, listings can grow without bound. Even with GC, CDC often needs higher retention. Retaining WAL files for just 1 hour can yield tens of thousands of files, which is expensive to list in both cost (object-store listing calls) and time.
If you plan to poll frequently:
- Use
list()once to get an initial view (or to recover after a long outage). - Track the highest WAL ID you have successfully processed.
- From then on, poll using
WalReader::get(latest_id + 1).
Deletes, merges, and TTL
Section titled “Deletes, merges, and TTL”- Deletes appear as
ValueDeletable::Tombstone. Emit a delete event in your CDC sink. - Merge operands appear as
ValueDeletable::Merge. If you use merge operators, downstream consumers must either apply the merge or store the operand as-is. - TTL is represented by
expire_ts. If your downstream system enforces TTL, honor this field.
WAL retention and GC
Section titled “WAL retention and GC”WAL SSTs are garbage collected based on the GC configuration. If your CDC consumer falls behind, the WAL files it needs may be deleted. Set the WAL GC min_age option to retain files long enough for your slowest consumer and implement monitoring. Slow consumers should copy the WAL files elsewhere for processing. See Garbage Collection for details.
Using a separate WAL object store
Section titled “Using a separate WAL object store”If your database uses a dedicated WAL object store, pass that store to WalReader::new rather than the main store.