Skip to content

Merge Operator

Status: Draft

Authors:

References:

To utilize SlateDB as a fully-featured state backend for a stream processor, it must support three key types of state:

  1. Value: Latest value or aggregating state.
  2. List / Bag / Buffering: State that stores multiple items for later processing.
  3. Map: Key-value state for fine-grained data.

Examples from the stream processing ecosystem:

State-of-the-art implementations built on RocksDB often leverage the Merge Operator to efficiently manage types 1 and 2. A prime example is list state, where elements are appended continuously until triggered by an event like a window firing. While this method may not always be the absolute optimal solution, it serves as a solid foundation for future iterations and enhancements.

The primary goal is to introduce a new user-facing record type (Merge Operand) and a Merge Operator interface to SlateDB, allowing applications to bypass the traditional read/modify/update cycle in performance-critical situations where computation can be expressed using an associative operator.

Supporting non-associative operators is not a goal of this RFC. The assumption is that associative operators can cover the majority of use cases while keeping the user-facing API simple and significantly reducing the scope. Badger has adopted the same limitation.

We will introduce a new record type (Merge Operand) and a Merge Operator interface to SlateDB, allowing applications to bypass the traditional read/modify/update cycle in performance-critical situations where computation can be expressed using an associative operator.

This will allows users to express partial updates to an accumulator using an associative operator, which will be merged during reads/compactions to produce the final result.

This plays quite nice with existing record types, as merge operands can be interleaved with values and tombstones, and will be correctly merged during reads/compactions. Both tombstone and value act as a barrier that clears any previous merge history for the same key.

This approach is similar to how RocksDB and Badger handle merge operations. We’re not inventing anything new here, but rather combining ideas from these projects to fit our use case. The main innovation that is not present in either RocksDB or Badger is the handling of TTLs.

We will introduce two new methods for writing Merge Operands into the database. They follow the same structure as existing put methods.

impl Db {
pub async fn merge(
&self,
key: &[u8],
value: &[u8]
) -> Result<(), SlateDBError> {
...
}
pub async fn merge_with_options(
&self,
key: &[u8],
value: &[u8],
merge_opts: &MergeOptions,
write_opts: &WriteOptions
) -> Result<(), SlateDBError> {
...
}
}
impl WriteBatch {
pub fn merge(
&self,
key: &[u8],
value: &[u8]
) -> Result<(), SlateDBError> {
...
}
pub fn merge_with_options(
&self,
key: &[u8],
value: &[u8],
merge_opts: &MergeOptions
) -> Result<(), SlateDBError> {
...
}
}
#[derive(Clone, Default)]
pub struct MergeOptions {
/// Merges with different TTLs will only be merged together during read but will be stored separately
/// otherwise to allow them to expire independently. This ensures proper TTL handling while still
/// allowing merge operations to work correctly.
pub ttl: Ttl,
}

We also need to introduce a new trait that will allow users to implement custom merging logic.

trait MergeOperator {
pub fn merge(
&self,
key: Bytes,
existing_value: Bytes,
value: Bytes
) -> Result<Bytes, MergeOperatorError>;
}

Initially, the implementation is limited to a single optional merge operator per database. The user must ensure that both the compactor and writer use the same merge operator to guarantee correct results.

The merge operator can be implemented to route to different merge strategies based on key prefixes. Here’s an example:

struct MyMergeOperator;
impl MergeOperator for MyMergeOperator {
fn merge(
&self,
key: Bytes,
existing_value: Bytes,
value: Bytes
) -> Result<Bytes, MergeOperatorError> {
if key.starts_with(b"list:") {
// concat values
Ok(...
} else if key.starts_with(b"min:") {
// calculate min
Ok(...)
} else {
Err(...)
}
}
}

It’s user’s responsibility to ensure that the merge operator is correctly implemented and the same instance of the operator is used across all components (compactor, writer, garbage collector).

User can provide their own merge operator implementation via DbOptions. If not provided, the database will not support merge operations.

impl DbOptions {
#[serde(skip)]
/// The merge operator to use for the database. If not set, the database will not support merge operations.
pub merge_operator: Option<Arc<dyn MergeOperator + Send + Sync>>,
}

The last public API change is extending the Ttl enum to support a new ExpireAt(ts) variant. This allows users to set a specific expiration time for a key, which overrides the default TTL behavior.

pub enum Ttl {
...
ExpireAt(i64),
}

See TTL Handling with Merge Operations for more details.

We need to introduce an additional variant of ValueDeletable that denotes a Merge Operand. As a result of this change, we can no longer represent the deletable as Option<Bytes> or Option<&[u8]>, because a non-empty state can represent either a value or a merge operand. This requires reusing the ValueDeletable data structure throughout the stack—all the way from the memtable to the row codec.

#[derive(Debug, Clone, PartialEq)]
pub enum ValueDeletable {
...
Merge(Bytes)
...
}

The WAL and memtable share a common underlying data structure (WritableKVTable). To support merge operations, we need to extend this structure to handle both merge operations and value overrides:

  • For merge operations, we need to maintain multiple entries per key in order to preserve the sequence of merge operands
  • For put and delete operations, we continue to override any previous values for that key

The WritableKVTable must preserve the order of operations to enable proper reconstruction of the merge sequence during reads, while still allowing puts and deletes to clear the merge history. Maintaining this hybrid approach is critical for handling merge-related error cases, such as:

  1. If merging operands fails (e.g. due to incompatible data formats), we need the full history to potentially retry with different merge strategies or fall back to the last known good value
  2. If merge operands have different TTLs, we need to track the individual expiry times and may need to fall back to earlier values when operands expire

In both cases, maintaining the operation history allows us to handle these edge cases gracefully rather than losing data.

We need to slightly adjust the binary format of the SST file and introduce a new flag to denote a merge operand. Note that the Tombstone and Merge flags are mutually exclusive.

Aside from this, Merge rows can follow the same structure as Value rows.

This change is backwards compatible.

The block format remains unchanged. It’s useful to highlight that merge operands are also stored in the bloom filters, so they have the same filtering guarantees as regular values.

Merges are expected to be read using the standard get API.

For each key, SlateDB maintains a history of operations, which is subject to compaction.

Let’s denote each operation as OPi. A key (K) that has experienced n changes looks like this logically (physically, the changes could be spread across the memtable and multiple SST files):

K: OP1 OP2 OP3 ... OPn
earliest --------------------> latest

Currently, during a read operation, SlateDB always observes the latest known state of the database. In other words, it only needs to look up the latest operation. If it’s a Value, it simply returns it, and if it’s a Tombstone, it returns None. All previous operations can be ignored and will be removed by compaction at some point.

K: OP1 OP2 OP3 ... OPn
^
|
GET

With the addition of a Merge Operand, it may be necessary to look backwards at previous values, up to the point where we encounter either a Value or a Tombstone. Everything beyond that point can be ignored.

K: OP1 OP2 OP3 OP4 OPn
Value Merge Merge Merge
^
|
GET

In the above example, get should essentially return something like:

Merge(OP2, Merge(OP3, Merge(OP4, OPn)))

During compaction, we continuously reduce the history by merging entries according to the rules described above. This process performs partial merges along the way, combining merge operands with their base values when possible. We also merge entries in the memtable as they arrive to minimize unnecessary history storage.

Compaction operates on a set of source SST files and writes to a destination SST file. The process reads all source SSTs sequentially and produces a single destination SST containing the merged results.

The table below shows how different combinations of previous and new values are handled during compaction:

PreviousNewResult
tombstonevaluevalue
tombstonemergevalue
tombstonetombstonetombstone
valuemergevalue
valuevaluevalue
valuetombstonetombstone
mergemergemerge
mergevaluevalue
mergetombstonetombstone

This part assumes that Transactions RFC is already in place.

Compaction must preserve snapshot isolation to avoid affecting externally observable state. RocksDB provides a proven solution to this problem that we can adopt.

The key insight is that merging must stop whenever we encounter a snapshot barrier, as this represents a point where a transaction may be reading from. Below is an adapted example from RocksDB’s documentation showing how this works in practice, where the + symbol represents a merge operation:

K: 0 +1 +2 +3 +4 +5 2 +1 +2
^ ^ ^
| | |
snapshot1 snapshot2 snapshot3
We show it step by step, as we scan from the newest operation to the oldest operation
K: 0 +1 +2 +3 +4 +5 2 (+1 +2)
^ ^ ^
| | |
snapshot1 snapshot2 snapshot3
A Merge operation consumes a previous Merge operation and produces a new Merge operation
(+1 +2) => Merge(1,2) => +3
K: 0 +1 +2 +3 +4 +5 2 +3
^ ^ ^
| | |
snapshot1 snapshot2 snapshot3
K: 0 +1 +2 +3 +4 +5 (2 +3)
^ ^ ^
| | |
snapshot1 snapshot2 snapshot3
A Merge operation consumes a previous Put operation and produces a new Put operation
(2 +3) => Merge(2, 3) => 5
K: 0 +1 +2 +3 +4 +5 5
^ ^ ^
| | |
snapshot1 snapshot2 snapshot3
A newly produced Put operation hides any previous operations up to the snapshot barrier
(+5 5) => 5
K: 0 +1 +2 (+3 +4) 5
^ ^ ^
| | |
snapshot1 snapshot2 snapshot3
(+3 +4) => Merge(3,4) => +7
K: 0 +1 +2 +7 5
^ ^ ^
| | |
snapshot1 snapshot2 snapshot3
A Merge operation cannot consume a previous snapshot barrier
(+2 +7) can not be combined
K: 0 (+1 +2) +7 5
^ ^ ^
| | |
snapshot1 snapshot2 snapshot3
(+1 +2) => Merge(1,2) => +3
K: 0 +3 +7 5
^ ^ ^
| | |
snapshot1 snapshot2 snapshot3
K: (0 +3) +7 5
^ ^ ^
| | |
snapshot1 snapshot2 snapshot3
(0 +3) => Merge(0,3) => 3
K: 3 +7 5
^ ^ ^
| | |
snapshot1 snapshot2 snapshot3

SlateDB supports two TTL approaches:

  1. Operation-Level TTL

    • Each operation (put/merge) has its own independent TTL, specified via:
      • Ttl::ExpireAfter(duration): Expires after specified duration (internally this is implemented as ExpireAt(Instant::now() + duration))
      • Ttl::ExpireAt(timestamp): Expires at specified timestamp
    • Enables per-element expiration in collections
  2. TTL Renewal (NOT SUPPORTED NATIVELY)

    • Refreshing TTL for entire key would require READ + MODIFY + UPDATE
    • Use standard PUT API if this behavior is needed

When merging values with different TTLs, the merge operation only combines values during reads while keeping entries separate in storage, allowing independent expiration per TTL.

Unlike regular values which become tombstones upon expiration, expired merge entries are simply removed, enabling per-element expiration in collections.

Users can implement custom TTL patterns by consistently using either ExpireAt or ExpireAfter across operations.

If there are multiple merge operands available for the same key, they will be ordered by their corresponding sequence numbers.

Merge operations can fail for several reasons:

  • The user-defined merge operator returns an error during execution
  • No merge operator is configured for the database

These failures can occur in three different code paths:

  1. Write Path
    • During merging of entries into the WAL or memtable
    • On failure: Unmerged operands are preserved in the WAL/memtable and retry occurs on next startup
  2. Read Path
    • When merging entries during a read operation
    • On failure: Error is propagated to the user
  3. Compaction Path
    • When the user-defined merge operator fails during compaction
    • On failure: Unmerged operands are preserved in the SST file and retry occurs on next compaction

We plan to validate the merge operator implementation with the following benchmarks focused on key use cases:

  • Append N elements to a list using merge operations and reading the final result vs read-modify-write
  • Compare performance for different value sizes
  • Expectation: Merge operations should show significant performance improvements over read-modify-write
  • Increment counters using merge operations vs read-modify-write
  • Compare performance for different value sizes
  • Expectation: Merge operations should be faster than read-modify-write against cached records and should show significant improvements over read-modify-write for uncached records.

The benchmarks will be implemented and maintained as part of the existing benchmark suite to validate the merge operator performance characteristics. No benchmark results are available yet as the feature is still under development.

We expect the merge operator to show significant benefits for buffering use cases by eliminating read-modify-write cycles. The actual performance improvements will be quantified once the implementation is complete.

The currently proposed implementation allows for a single merge operator per database. While this may seem limiting, the current interfaces allow implementing support for multiple operators in userspace by routing to different implementations based on key prefixes or other criteria. This approach aligns with RocksDB’s design philosophy.

If compelling use cases emerge that demonstrate clear benefits of first-class support for multiple merge operators, we can consider adding this capability in a future iteration. However, the current single-operator design provides a good balance of simplicity and flexibility for most use cases.

One possible optimization is to introduce a new read option for persisting the result of a GET operation when it was merged from multiple operands. This would allow for faster subsequent GETs by avoiding the need to recompute the merge. The result could be stored as a regular value in the memtable or an SST file. Although this optimization sounds appealing, it’s not clear whether the benefits outweigh the added complexity and storage overhead. For certain use cases, such as buffering, it might have a negative performance impact.

It might be useful to have a read option that allows for early termination of merge operations. This could be beneficial for buffering use cases where the user wants to limit the number of operands that are merged together (effectively allowing partial iterations).

RocksDB provides an alternative approach by exposing GetMergeOperands which allows listing the unmerged operands directly.