WAL Write Groups Under Tokio Async Context

Write-Ahead Logging in MuopDB

I am a contributor to MuopDB, a Rust vector database. This DB supports these operations that the client performs on a vector collection: insert, remove, search, and flush. Write-Ahead Logging (WAL) is already implemented to ensure data integrity. Every write request (either insert or remove) is first recorded to the WAL before it is applied to the actual data files. As soon as the WAL write is acknowledged, we send a response to the client immediately. The actual data change is processed later by ingestion threads in the background.

A WAL entry is a continuous byte array in a WAL file, storing the following information in order:

  1. entry length (total number of bytes in this WAL entry)
  2. number of document IDs
  3. number of user IDs
  4. all user IDs
  5. all document IDs
  6. vector data (empty for delete operations)
  7. operation type (0 for insert and 1 for delete)

Since WAL is used for crash recovery, we want the WAL entry to be written to disk as soon as possible. Normally, in most modern file systems, file writes are buffered in memory anywhere between 5 and 30 seconds 1. This duration is too long for saving database WAL data, where new write requests can come in every millisecond. If the system crashes before the WAL entries are persisted to disk, all of that data is lost. Thus, after writing all of the WAL entry’s bytes to the WAL file, we use the fsync() (or fdatasync()) 2 system call to force the WAL entries’ data to disk.

The Problem with fsync()

MuopDB is served by a gRPC server running under the Tokio runtime, which uses a multi-threaded, work-stealing scheduler. Each write request is handled by a separate Tokio task, which writes the WAL entry to the WAL file by calling the following append function:

  impl WalFile {
    ...
    pub fn append(
        &mut self,
        doc_ids: &[u128],
        user_ids: &[u128],
        op_type: WalOpType<&[f32]>,
    ) -> Result<u64> {
        let (op_type, data): (u8, &[f32]) = match op_type {
            WalOpType::Insert(data) => (0, data),
            WalOpType::Delete => (1, &[]),
        };

        let len = (8 + 8 + doc_ids.len() * 16 + user_ids.len() * 16 + data.len() * 4 + 1) as u32;
        self.file.write_all(&len.to_le_bytes())?;
        self.file.write_all(&(doc_ids.len() as u64).to_le_bytes())?;
        self.file
            .write_all(&(user_ids.len() as u64).to_le_bytes())?;
        self.file.write_all(transmute_slice_to_u8(doc_ids))?;
        self.file.write_all(transmute_slice_to_u8(user_ids))?;
        self.file.write_all(transmute_slice_to_u8(data))?;
        self.file.write_all(&op_type.to_le_bytes())?;
        self.file.flush()?;
        self.file.sync_data()?;

        // Increment the number of entries in the file
        self.num_entries += 1;
        Ok((self.start_seq_no + self.num_entries as i64) as u64)
    }
}

This append function writes the WAL entry to the file and calls self.file.sync_data() to sync the data to disk, where self.file is a std::fs::File struct opened in append mode. Note that since std::fs::File does not buffer writes (we want writes to propagate to disk as soon as possible), calling self.file.flush() is a no-op in Rust, but it is good practice to ensure that all data is written to the OS.

However, there is a problem with this approach. fsync()/fdatasync() is a costly blocking operation. It may take anywhere between 3 ms to 20 ms to complete, depending on the underlying hardware 3. If we call fsync() for every write request, we would achieve the throughput of around 6 writes per millisecond. This is not satisfactory for a database system.

We could use tokio::task::spawn_blocking to offload the blocking fsync() call to a separate Tokio-managed thread, but this approach has its own drawbacks. While Tokio’s blocking thread pool has a high default limit, excessive concurrent write requests can still lead to increased context switching and scheduling overhead, which may impact performance. Additionally, if the WAL file is stored on a network file system (NFS), fsync() may take much longer to complete, causing blocking threads to remain occupied for extended periods. This can increase latency for new blocking tasks, as they may have to wait for available threads.

One solution is to use an async implementation of std::fs, so that calling fdatasync() would not block the entire Tokio worker thread. There is the async-fs crate, which uses the blocking crate to offload blocking I/O to a separate thread pool. For true async file I/O, there’s tokio-uring, which uses Linux’s io_uring interface. However, tokio-uring requires a separate runtime from Tokio, and it is only compatible with a very recent Linux kernel (5.11.0 at the time of writing).

Another promising approach is to use separate background threads to handle WAL sync. Each write request would write the WAL entry to the OS’s page cache with write(), and one or more separate threads would periodically flush to disk with fdatasync(), either periodically or on demand. This approach has the benefit of batching multiple WAL entries into a single fdatasync() call, which improves throughput. This is what Hieu Pham did in this PR, spawning a separate group of threads for WAL syncing that wakes up every 100 ms to sync the WAL files.

However, there is substantial overhead in switching between threads in a high-throughput networking service. You also have to be careful with how you wake up the sync threads. If the sync threads wake up too often, you would end up calling fdatasync() too frequently, which is wasteful. If they wake up too infrequently, you would have high latency for write requests (because the write operation is returned only when its WAL entry is synced on disk). This trade-off between throughput and latency also exists with a threshold-based heuristic (e.g., wake up the sync threads when there are at least N unsynchronized WAL entries).

What we need is an approach that:

  1. does not use extra threads,
  2. batches multiple WAL entries into a single fdatasync() call
  3. and handles timeout.

WAL Write Groups

Basic Idea

Hieu Pham suggested that I use a “write group” approach, inspired by RocksDB. The idea is that incoming threads put their write requests into a queue. The first enqueued thread becomes the leader and batches together consecutive followers in the queue into a group under a mutex. The leader thread writes all of the WAL entries from all threads in the group (including its own) to the WAL file, and then performs WAL sync once. All other threads in the group (the followers) just wait for the leader to finish syncing, and then return. In our case, we have Tokio tasks instead of threads, so I had to implement this with async primitives. The write group must have a maximum size and be able to handle timeouts. Here’s what I came up with:

  use std::sync::Arc;
use tokio::sync::Mutex as AsyncMutex;

/// Represents an entry for a follower task
struct GroupEntry {
    ...
}

/// Represents a group that the leader will own and use to append to WAL and sync WAl
struct WalWriteGroup {
    max_num_entries: usize,
    entries: Vec<GroupEntry>,
}

/// Coordinator to keep the current open `WalWriteGroup`
struct WalWriteCoordinator {
    current_group: Option<WalWriteGroup>,
    wal_write_group_size: usize,
}

struct Collection {
    ...
    write_coordinator: Arc<AsyncMutex<WalWriteCoordinator>>,
}

The Collection struct represents a vector collection in MuopDB. Each collection has its own write_coordinator which manages the current write group. The WalWriteCoordinator struct keeps track of the current write group and the write group size, which will be the maximum size of a write group. This struct is wrapped in an Arc<AsyncMutex<...>> so that multiple Tokio tasks can access it concurrently.

Every WAL write task goes through this basic flow:

  1. Lock the coordinator and take the current group if it exists, otherwise create a new one.
  2. Check the number of entries in the group.
  3. If it reaches max_num_entries, this task becomes the group’s leader. The leader owns the current group, drops the lock, and starts processing the group.
  4. Otherwise, this task becomes a follower. The follower adds the WAL entry to the group, puts the group back to the coordinator, drops the lock, and waits for the leader.

Sequence Number Handling

Each WAL entry insert returns a sequence number seq_no from the WalFile::append() method. That means each follower has its own seq_no, and the leader has its own seq_no too. The leader has to have a way to send the correct seq_no to its correct follower. Luckily Tokio provides tokio::sync::oneshot channel which is perfect for this use case. Each follower creates a oneshot channel and puts the sender in its GroupEntry. The leader keeps all of the receivers in its WalWriteGroup. After the leader finishes syncing the WAL entries, it sends the correct seq_no to each follower through their respective oneshot channels. The followers wait for the seq_no from the leader and return it. Now the GroupEntry struct looks like this:

  use tokio::sync::oneshot;

struct AppendArgs {
    doc_ids: Arc<[u128]>,
    user_ids: Arc<[u128]>,
    op_type: WalOpType<Arc<[f32]>>,
}

/// Represents an entry for a follower task
struct GroupEntry {
    /// follower's WAL entry data to append to WAL
    args: AppendArgs,
    /// sender to send sequence number to the corresponding follower
    seq_tx: oneshot::Sender<u64>,
}

Now the leader’s job should look like this:

  
// A closure to append to WAL, send operation to ingestion threads, and return seq_no
let append_wal = async |wal: &RwLock<Wal>, args: AppendArgs| -> Result<u64> {
  ...
};

// A closure to run as the leader
let write_follower_entries = |group: WalWriteGroup, leader_args: AppendArgs| async move {
    // Go through all entries in order and collect seq_no results
    let mut results: Vec<(u64, oneshot::Sender<u64>)> = vec![];
    for entry in group.entries {
        let seq_no = append_wal(wal, entry.args).await?;
        results.push((seq_no, entry.seq_tx));
    }

    // Finally append the leader's own entry
    let leader_seq_no = append_wal(wal, args).await?;

    // Sync the WAL at once
    let entries_synced = self.sync_wal()?;

    // Now send seq_no to all followers
    for (seq_no, seq_tx) in results {
        seq_tx
            .send(seq_no)
            .expect("WAL Leader: follower's receiver dropped");
    }

    // Return the leader's seq_no
    Ok(leader_seq_no)
};

Putting it all together, the logic in the write_to_wal function in the Collection struct looks like this:

  impl Collection {
    pub async fn write_to_wal(
        &self,
        doc_ids: Arc<[u128]>,
        user_ids: Arc<[u128]>,
        wal_op_type: WalOpType<Arc<[f32]>>,
    ) -> Result<u64> {
        // Lock the write coordinator and take out the current group
        let mut coordinator = self.write_coordinator.lock().await;
        let mut current_group = match coordinator.current_group.take() {
            Some(group) => group,
            // Create a new group if current group is None
            None => coordinator.new_wal_write_group(),
        };

        let append_wal = async |wal: &RwLock<Wal>, args: AppendArgs| -> Result<u64> {
          ...
        };
        let write_follower_entries = |group: WalWriteGroup, leader_args: AppendArgs| async move {
            ...
        };

        if current_group.should_close() {
            // This task will be the leader. Own the current group and put a new group in the coordinator.
            coordinator.current_group = Some(coordinator.new_wal_write_group());
            drop(coordinator);

            // Write all follower entries to WAL, including this leader's entry
            let leader_seq_no = write_follower_entries(
                current_group,
                AppendArgs {
                    doc_ids,
                    user_ids,
                    op_type: wal_op_type,
                },
            )
            .await?;

            Ok(leader_seq_no)
        } else {
            // This task will be the follower and join the current group.
            let (seq_tx, mut seq_rx) = oneshot::channel();

            // Create a new entry in the current group
            current_group.entries.push(GroupEntry {
                args: AppendArgs {
                    doc_ids,
                    user_ids,
                    op_type: wal_op_type,
                },
                seq_tx,
            });

            // Put the current group back to the coordinator
            coordinator.current_group = Some(current_group);
            drop(coordinator);

            // Wait for leader completion
            let follower_seq_no = seq_rx.await?;
            Ok(follower_seq_no)
        }
    }
}

Handling Timeout

Now we need to handle timeout in the follower tasks. To solve this, we can use tokio::time::timeout to set a maximum wait time for a follower. If the timeout expires, the first follower becomes the leader and processes the current group immediately. The timeout logic in the follower looks like this:

  
const TIMEOUT_DURATION: Duration = Duration::from_millis(10);

// This task will be the follower and join the current group.
let (seq_tx, mut seq_rx) = oneshot::channel();

// Get the 0-based index of this entry in the group
let follower_entry_id = current_group.entries.len();

// Create a new entry in the current group
current_group.entries.push(GroupEntry {
    args: AppendArgs {
        doc_ids: doc_ids.clone(),
        user_ids: user_ids.clone(),
        op_type: wal_op_type.clone(),
    },
    seq_tx,
});

let is_first_follower = follower_entry_id == 0;

// Follower waits for leader, but can become leader on timeout
let result = tokio::time::timeout(TIMEOUT_DURATION, seq_rx).await;
match result {
    Ok(Ok(seq_no)) => Ok(seq_no), // got seq_no from leader
    Ok(Err(_)) => Err(...),      // leader dropped
    Err(_) => {
        // Timeout: check if I'm the first follower
        if is_first_follower {
            // Become leader, process group, notify others
            // ... process group ...
            let my_seq_no = seq_rx.await?;
            Ok(my_seq_no)
        } else {
            // Not first: keep waiting for leader
            let seq_no = seq_rx.await?;
            Ok(seq_no)
        }
    }
}

For my implementation though, I used tokio::select! to implement the timeout logic, which looks like this:

  
// Wait for either leader completion OR timeout
tokio::select! {
    result = &mut seq_rx => {
        // Got seq_no from leader
        let follower_seq_no = result.expect("WAL follower: leader's sender dropped");
        Ok(follower_seq_no)
    }
    _ = tokio::time::sleep(TIMEOUT_DURATION) => {
        // Timeout: check if I'm the first follower
        if is_first_follower {
            // Become leader, process group, notify others
            // ... process group ...
            let my_seq_no = seq_rx.await.expect("WAL follower: leader's sender dropped");
            Ok(my_seq_no)
        } else {
            // Not first: keep waiting for leader
            let seq_no = seq_rx.await.expect("WAL follower: leader's sender dropped");
            Ok(seq_no)
        }
    }
}

I think tokio::time::timeout would be simpler for this use case, actually.

Benchmarking Results

I benchmarked the WAL write implementation in write_to_wal using criterion crate, comparing three different approaches:

  1. sync-on-write - Original implementation: each write request calls fdatasync() directly in the Tokio task.
  2. sync-background - Hieu Pham’s implementation: a separate sync thread wakes up every 100 ms to call fdatasync() if there are unsynced WAL entries.
  3. sync-on-batch - My implementation: write groups with a maximum size of 940 followers, and a timeout of 10 ms.

The benchmark code spawns 1000 concurrent write_to_wal tasks, each writing a WAL entry with 1 document ID, 1 user ID, and a vector of dimension 128. Here’s the benchmark result after running on my Macbook Pro 2021 (M1, 10-core CPU, 16GB RAM, 512GB SSD):

Groupsync-on-writesync-backgroundsync-on-batch
WalInsertion/WalInsertion/1000116.47 4.3±0.20s9.02 335.5±84.68ms1.00 37.2±0.51ms

The sync-on-batch approach is 9x faster than the sync-background approach, and more than 100x faster than the original sync-on-write approach. Hieu Pham’s benchmarks on Linux showed better performance for sync-background (166 ms) and slightly better performance for sync-on-batch (32 ms), which equates to a 5x performance boost. I think the difference is due to the underlying hardware and file system. Either way, the sync-on-batch approach brought significant improvement in WAL write throughput.

Reflection

It was a fun challenge to implement WAL write groups under the Tokio async context. It took me a few iterations to get the design right, especially with the concurrent access to the write coordinator and handling timeouts. After that, the actual implementation was fairly straightforward. In my implementation, the timeout duration and group size are two tunable parameters that can be adjusted to adjust the trade-off between throughput and latency.

In the first benchmark run, I set the timeout to 100 ms, which turned out to be the bottleneck. Since the WAL group size is 940 and the total number of tasks is 1000, the last group only has 60 entries. With a timeout of 100 ms, the last group took 100 ms to complete, which made the total time around 130 ms on my laptop. After reducing the timeout to 10 ms, the total time dropped to around 37 ms. I also experimented with different group sizes (1 to 1000), and found the best performance at 940.

Before my implementation, the data, user ID, and doc ID were passed as slices (&[T] type). Now with the write group approach, we need lifetime annotations to ensure that the data lives long enough for the leader to process it. Also, the slices might need to be accessed concurrently by multiple tasks. To avoid lifetime issues, I changed the data types to Arc<[T]>, which adds some overhead due to atomic reference counting, but I think it’s worth it for the simplicity. At least it’s better than copying the data for every write request 🙂. This YouTube video by Logan Smith suggested me this approach.

This was a great learning experience for me in doing system programming in Rust. I want to give a shout-out to Hieu Pham for suggesting this idea and helping review my PRs. If you’re interested, you can check out my first iteration of the implementation in this PR. Hope you got something useful out of this post!

Footnotes

  1. From section 40.7, Caching and Buffering in the OSTEP book ↩

  2. Technically fdatasync() is preferred over fsync() since it only flushes the file data and not the metadata, which is faster. We use std::fs::File::sync_data() in Rust which maps to fdatasync() on Unix systems. ↩

  3. Based on the results in this post ↩