Skip to content

Subcompactions

Status: Accepted

Authors:

This RFC proposes subcompactions, compactions over non-overlapping sub-ranges of the key space, persisted durably as part of a logical parent compaction. Partitioning a logical compaction into subcompactions lets us execute the sub-ranges in parallel while still still resuming a compaction at subcompaction granularity after a failure.

SlateDB’s current compaction throughput is bottlenecked on CPU with no ability to concurrently execute compactions. On some production workloads, users found Slate compaction limited to processing 20-30 MB/s. This is especially problematic in a default size-tiered compaction world where large sorted runs can span dozens to hundreds of gigabytes.

Note that this is complementary to distributed compactions. Distributed compactions allow concurrent compactions to execute on multiple workers while subcompactions allow an individual worker to parallelize execution of a single compaction. We can eventually extend SlateDB to allow subcompactions to execute on separate workers within the distributed worker framework, but that is out of scope for this RFC.

  • Parallelize a single logical compaction across multiple key ranges.
  • Define subcompaction boundaries aligned with SSTs
  • Support persisting subcompactions so that range-level resume is possible.
  • Keep backward compatibility with existing .compactions files.
  • Distributed execution of subcompactions

This RFC proposes a new concept to SlateDB: the subcompaction. While a compaction necessarily covers the entire key range and outputs a complete SortedRun, a subcompaction only covers a partial key range. Subcompactions are only valid within the context of a parent compaction.

I find it’s easiest to understand the concept by looking at a simple, but complete, example. This example compacts a single sorted run (e.g. just one input SR, which can happen if you want to periodically compact the base run).

In this case, the input sorted run has 3 SSTs that cover the full key range: 91, 92 & 93. The subcompaction scheduler (heuristic discussed later) decided to split this into two sub-ranges of the full keyspace covering (-∞, k) and [k, +∞)

┌───────────────────input: SR(9)───────────────────┐
├SST(91)─────────┬SST(92)─────────┬SST(93)─────────┤
│████████████████│▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓│▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒│
├────────────────┴───────┬────────┴────────────────┤
┌┤ subcompaction A │ subcompaction B ├┐
│└─ ─ ─ ─ (-∞,k)─ ─ ─ ─ ─ ┴─ ─ ─ ─ ─(k,+∞)─ ─ ─ ─ ─ ┘│
│ │
│ │
│┌SST(93)─────────┬SST(94)┬SST(95)──────────┬SST(96)┐│
└▶████████████████│▓▓▓▓▓▓▓│▓▓▓▓▓▓▓▓▓▒▒▒▒▒▒▒▒│▒▒▒▒▒▒▒◀┘
├────────────────┴───────┴─────────────────┴───────┤
└──────────────────output: SR(10)──────────────────┘

This example will run two sub-compactions:

  1. Input SST(91), SST(92) with view range covering keys (-∞, k)
  2. Input SST(92), SST(93) with view range covering keys [k, +∞)

Each sub-compaction will output SSTs that cover the range that was assigned to it. In this case they will each need to output two SSTs to respect the max sst size configuration. The parent compaction will then update the manifest with the result of the compaction, indicating that the new sorted run SR(10) has all four SSTs.

Note 1: this demonstrates a limitation of sub-compaction which is that it may generate more SSTs than required, with multiple having a size less than the max SST configuration.

Note 2: the heuristic proposed later in this RFC actually prevents this specific example from happening because it chooses a non-SST boundary for the split. Splitting SSTs, however, is still possible with this RFC’s approach when merging multiple SRs, so I used this example to illustrate that.

The below pseudocode outlines the entire subcompaction lifecycle. The key point is that it builds on top of three existing primitives: (a) the existing compaction logic runs nearly as-is within the scope of a subcompaction, (b) it reuses the SST view concept with sst.overlaps() to handle shared-input SSTs to ranges and (c) it persists the output of subcompactions to the .compactions file that now includes subcompactions (see spec in the next section).

// executes a full compaction with subcompactions
execute_compaction(spec):
// the state contains information on the current progress of subcompactions,
// so that we can resume interrupted/failed ones from their last successful
// state write
state := fetch(000X.compactions)
if state.subs does not exist:
subs := build_subcompactions(spec)
persist subs in 000X.compactions
// we can parallelize the subcompactions up to the max configured parallelism
#parallel(max_concurrent_subcompactions)
for sub in state.subs.incomplete():
run_subcompaction(spec, sub)
if any subcompaction failed:
mark parent compaction failed
return
// the final construction of the Sorted Run is just apply metadata and marking
// the state as compacted
result_ssts = concatenate sub.output_ssts in range order
sr = to_sr(result_ssts)
mark parent compaction compacted
// only the parent compaction job will ever attempt to update the manifest (via
// the coordinator as outlined in RFC-25)
update_manifest(sr)
mark parent compaction completed
// runs a subcompaction by applying the subcompaction range to any input SSTs
// and then merging them using the existing compaction path to build output
// SSTs
run_subcompaction(spec, sub):
mark subcompaction running
views = []
for sst in spec.input_ssts:
if sst.overlaps(sub.range):
views += sst.range(sub.range)
writer = Writer::new
for key in merge(views):
writer.add(key)
if writer.sst_full():
// this goes directly to object storage
persist SST
// this goes over an rx/tx channel to the parent compaction, which handles
// persisting progress to the .compactions file to avoid too much contention
persist progress
persist subcompaction completed with result_ssts

It’s worth noting that the main coordinator is still responsible for submitting the compaction, but the worker is responsible for computing the subcompaction plan (splits). This allows future optimizations where the subcompaction planner can pull information like SST indexes to improve the split algorithm without affecting the cache on the active serving node in distributed compaction.

In order to support resuming partial compactions we need to have a durable model for subcompactions. This reflects the existing Compactions model closely, relying on output_ssts and status to track how far along the subcompaction is:

table Compaction {
id: Ulid (required);
spec: CompactionSpec (required);
status: CompactionStatus;
output_ssts: [CompactedSsTable]; // will be empty if running subcompactions
worker: WorkerSpec;
subcompactions: [Subcompaction]; // <---- NEW
}
table Subcompaction {
range: BytesRange (required);
status: CompactionStatus;
output_ssts: [CompactedSsTable];
}

Resuming subcompactions works roughly the same way that resuming compactions does today. When we resume a subcompaction, we check the output_ssts and seek the merge iterator to the last key of the last sst in that list.

Note on GC: subcompaction output SSTs are protected by the existing compaction low-watermark. The parent compaction stays in the in-flight set until every subcompaction completes, and the output SSTs carry ULIDs newer than the parent’s id, so they sit above the watermark and cannot be collected. The boundary file plays only its usual role of keeping old .compactions files around long enough to compute that watermark.

The goal of the boundary selection is to make the sub-compactions roughly equally sized without spending too much time figuring this out (i.e. only by reading the manifest).

For v0 of subcompactions, we’ll only consider SST boundaries as potential split points since the first/last keys are available directly in the manifest alongside the size estimates of various SSTs. The algorithm for determining splits is:

// imagine three SSTs with ranges [a-k],[c-m],[f-z], then the candidates are
// [a, c, f, k, m, z] and the candidate intervals are [(a,c), (c,f), (f,k),
// (k,m), (m,z)]
candidate_keys = union of input SST/view starts and ends
intervals = adjacent candidate key ranges
for interval in intervals:
interval.weight = sum size_estimate(sst) for every input sst overlapping interval
// if the full interval weights are 600MB, and we want three subcompactions
// then the target range would cover 200MB each so we sum the intervals until
// we get close to the candidate and select that as the range
target = sum(interval.weight) / desired_subcompactions
for interval in intervals:
accumulate interval.weight
when accumulated >= target:
emit boundary at interval end
reset toward next target
  • max_subcompactions configures the desired number of subcompactions for a compaction, with any number <=1 disabling subcompactions. Default 4.
  • min_subcompaction_input_bytes prevents small subcompaction jobs from running, which can cause the creation of fragmented SSTs. Defaults to 2 x max_sst_size.

We could optionally introduce max_concurrent_subcompactions to allow scheduling more subcompactions than we run concurrently actively, but until we decide to integrate this with the distributed compaction worker framework I think it’s better to delay that.

Each subcompaction creates its own CompactionFilter via the supplier and invokes on_compaction_end once when its sub-range is exhausted, so a logical compaction now fires on_compaction_end once per subcompaction (concurrently) rather than once overall. This is safe for the documented use cases of the hook—flushing state, logging statistics, or cleaning up resources—since each is naturally scoped to the entries that instance observed, and it generalizes the existing resume contract where a resumed compaction already builds a fresh filter that observes only a partial key stream. Filters that instead need to aggregate across the full keyspace must do so via shared state on the CompactionFilterSupplier.

SlateDB features and components that this RFC interacts with. Check all that apply.

  • Basic KV API (get/put/delete)
  • Range queries, iterators, seek semantics
  • Range deletions
  • Error model, API errors

Consistency, Isolation, and Multi-Versioning

Section titled “Consistency, Isolation, and Multi-Versioning”
  • Transactions
  • Snapshots
  • Sequence numbers
  • Time to live (TTL)
  • Compaction filters
  • Merge operator
  • Change Data Capture (CDC)
  • Manifest format
  • Checkpoints
  • Clones
  • Garbage collection
  • Database splitting and merging
  • Multi-writer
  • Compaction state persistence
  • Compaction filters
  • Compaction strategies
  • Distributed compaction
  • Compactions format
  • Write-ahead log (WAL)
  • Block cache
  • Object store cache
  • Indexing (bloom filters, metadata)
  • SST format or block format
  • CLI tools
  • Language bindings (Go/Python/etc)
  • Observability (metrics/logging/tracing)

This is discussed in the main body of the RFC as it’s primarily a performance optimization. It enables better utilization of CPU at the cost of potential SST fragmentation at subcompaction boundaries.

MetricTypeMeasures
slatedb.compactor.running_subcompactionsup/down counterSubcompactions executing concurrently right now.
slatedb.compactor.subcompactions_per_compactionhistogramNumber of subcompactions the heuristic produced per parent compaction.
slatedb.compactor.subcompaction_duration_sechistogramPer-range wall-clock time from start to output SSTs persisted.
slatedb.compactor.subcompactions_resumedcounterSubcompactions resumed from persisted state rather than started fresh.

In place of introducing more throughput metrics, the existing aggregate slatedb.compactor.bytes_compacted and slatedb.compactor.total_throughput_bytes_per_sec metrics remain authoritative for overall throughput.

  • Wire format: subcompactions is appended to the end of the Compaction table, so the change is forward/backward compatible. Old readers skip the field; new readers see an empty vector on pre-existing .compactions files and run the compaction without subcompactions.
  • Rollback: a pre-subcompaction binary ignores the subcompactions field, sees empty output_ssts, and re-runs the whole compaction single-threaded. This is correct but abandons any already-written subcompaction output SSTs. Those orphans are never referenced by the manifest and are reclaimed by GC (see below). No manual cleanup is required; rolling back only wastes in-flight work.
  • Garbage collection: subcompaction output SSTs are written before the manifest commit but carry ULIDs newer than the parent compaction’s id. The parent stays in the in-flight set until every subcompaction completes, pinning the compaction low-watermark above those SSTs, so the existing GC protection covers them with no version-coupled change (even when GC is running on an older code version).

The testing plan beyond the usual set of work includes running slatedb/src/compaction_execute_bench.rs to ensure that subcompactions will, in practice, improve the throughput of compactions. We could consider modeling this with Fizzbee, but I don’t think it’s necessary.

I opted to default to set max_subcompactions = 4 by default instead of off-by-default because I think it’s a pretty foundational feature for large compaction jobs. Note that this departs from prior art. RocksDB ships with it disabled (max_subcompactions = 1) but they also do not ship with a min_subcompaction_input_bytes.

See the compatibility section for why this is safe to rollout in one go.

L0 sublevels (Pebble’s approach). Rejected. Pebble does not split a single logical compaction across key ranges; it parallelizes by running multiple independent compactions at once, gated by MaxConcurrentCompactions. It makes this possible with L0 sublevels and flush splitting, which carve the keyspace into non-overlapping ranges so several L0->Lbase compactions can proceed without conflicting.

This works for Pebble because it is a leveled engine where the parallelism opportunity lives at the L0->Lbase boundary. It does not help the case this RFC targets: under size-tiered compaction a single large sorted-run compaction is still one indivisible unit of work on one core, and no amount of cross-compaction concurrency speeds it up. Subcompactions parallelize within that unit, which is the only approach that addresses big SR compactions.

Index Metadata Split Algorithms. Deferred. The idea here is to use the index metadata on SSTs to determine where we should be splitting instead of the rough heuristic that’s outlined in this RFC. It’s probably worth doing, but I’ve deferred details of that for a v2 implementation.

Parallelizing block construction within a single range. Deferred/Partial Rejection. This is a valid alternative to the RFC in that it allows you to have more threads handling the compaction. To be honest, I think it’s hard to tell whether this is a better alternative without implementing it. It is likely to be more complicated to implement, and also parallelize less of the overall compaction workload. In addition, it makes the resuming mechanism a little more challenging as we need to synchronize the production of sub-ranges which may cause lagging threads to serialize compaction. I’m inclined to reject.

It is, in addition, not exclusive with the proposal in this RFC and if we wanted to optimize a single sub-compaction we could also apply parallelism within a single subcompaction (or a top level compaction, which shares the same logic).