Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Appearance settings
Discussion options

Summary

During atomic slot migration in Valkey cluster mode, the slotMigrationPipeReadHandler() function contains an unbounded while(1) loop that can monopolize the main thread for extended periods, potentially starving regular client requests. This issue becomes severe when migrating large slots (e.g., 10GB+), where the main thread can be occupied up to 65% of the time.

Environment

  • Valkey Version: 9.0.0 (issue likely affects other versions)
  • Cluster Mode: Enabled
  • Migration Type: Atomic slot migration (CLUSTER MIGRATESLOTS)

Problem Description

The slotMigrationPipeReadHandler() function (src/replication.c:1846) uses an unbounded while(1) loop to read data from a pipe and write it to the target node. While each individual invocation exits relatively quickly (after ~4 iterations when the pipe is emptied), the event handler is repeatedly triggered thousands of times during large data migrations, leading to cumulative main thread monopolization.

Code Analysis

void slotMigrationPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) {
    // ...
    while (1) {  // ⚠️ Unbounded loop
        // Read up to 16KB from pipe
        server.slot_migration_pipe_bufflen = read(fd, server.slot_migration_pipe_buff, PROTO_IOBUF_LEN);

        if (server.slot_migration_pipe_bufflen < 0) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) return;  // Exit point 1: pipe empty
            // Error handling...
            return;
        }

        if (server.slot_migration_pipe_bufflen == 0) return;  // Exit point 2: EOF

        // Write to target node
        ssize_t nwritten = connWrite(conn, server.slot_migration_pipe_buff,
                                      server.slot_migration_pipe_bufflen);

        if (nwritten != server.slot_migration_pipe_bufflen) {
            // Exit point 3: Network buffer full or partial write
            connSetWriteHandler(conn, slotMigrationPipeWriteHandler);
            aeDeleteFileEvent(server.el, server.slot_migration_pipe_read, AE_READABLE);
            return;
        }

        // If all above conditions pass, continue looping
    }
}

Loop Exit Conditions

The loop exits when:

  1. Pipe is empty (read() returns EAGAIN)
  2. Pipe is closed (read() returns 0)
  3. Network buffer is full (write() returns less than requested)

However, on high-speed networks, the TCP send buffer rarely fills up, and the loop primarily exits due to pipe emptiness. The child process continuously refills the pipe, causing the event handler to be triggered repeatedly.

Impact Analysis

Quantitative Analysis for 10GB Slot Migration

Total data size: 10GB
Pipe buffer per round: 64KB (typical)
Total rounds: 10GB / 64KB = 163,840 rounds

Iterations per round: ~4 (until pipe is empty)
Total loop iterations: 163,840 × 4 = 655,360 iterations

Time per iteration: ~10μs (read + write syscalls)
Cumulative time in while(1): 655,360 × 10μs ≈ 6.5 seconds

If migration completes in 10 seconds:
Main thread occupancy: 6.5 / 10 = 65% 🚨
Time available for client requests: 35%

Impact Table

Slot Size Total Iterations Cumulative Time Main Thread Occupancy Client Latency Increase
1GB 65,536 0.65s ~30% ~1.4x
10GB 655,360 6.5s ~65% ~2.8x
100GB 6,553,600 65s ~65% ~2.8x

Root Cause

1. Lack of Iteration Limit

Unlike replication code which has REPL_MAX_READS_PER_IO_EVENT = 25 to prevent starvation (src/networking.c:4109), slot migration has no such protection:

// Replication has protection (src/networking.c:4109)
#define REPL_MAX_READS_PER_IO_EVENT 25

void readQueryFromClient(connection *conn) {
    int iter = 0;
    do {
        bool full_read = readToQueryBuf(c);
        // ...
        repeat = (c->flag.primary &&
                  !c->flag.close_asap &&
                  ++iter < REPL_MAX_READS_PER_IO_EVENT &&  // ⚠️ Iteration limit
                  full_read);
    } while (repeat);
}

The comment explicitly states (src/networking.c:4115):

"For the primary, if we've processed queries for long enough, break the loop even if data remains. This allows us to process other events such as I/O and timer, so we don't starve other clients and we don't run into issues with our output buffer size."

Slot migration lacks this protection.

2. TCP Buffer Rarely Fills on High-Speed Networks

High-speed network (10Gbps):
  Time to clear 128KB buffer: 128KB / 1250MB/s ≈ 100μs

One round of while(1) (4 iterations):
  Duration: ~40μs
  Data written: 64KB

During these 40μs, kernel sends: 64KB × (40μs / 100μs) ≈ 25KB
TCP buffer remaining: 64KB - 25KB = 39KB (still has space)

Result: Loop exits primarily due to pipe emptiness, not TCP buffer fullness

3. Repeated Event Triggering

T=0ms:    Pipe readable → while(1) runs 4 times → pipe empty → exit
T=0.1ms:  Child process writes 64KB → pipe readable → while(1) runs 4 times → exit
T=0.2ms:  Child process writes 64KB → pipe readable → while(1) runs 4 times → exit
...
Repeated 163,840 times for 10GB migration!

Proposed Solutions

Solution 1: Add Iteration Limit (Recommended)

Similar to replication code, add a maximum iteration limit:

#define SLOT_MIGRATION_MAX_READS_PER_IO_EVENT 10

void slotMigrationPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) {
    UNUSED(mask);
    UNUSED(clientData);
    UNUSED(eventLoop);

    if (!server.slot_migration_pipe_buff)
        server.slot_migration_pipe_buff = zmalloc(PROTO_IOBUF_LEN);
    if (!server.slot_migration_pipe_conn) return;

    int iter = 0;  // ⚠️ Add iteration counter

    while (iter < SLOT_MIGRATION_MAX_READS_PER_IO_EVENT) {  // ⚠️ Add limit
        server.slot_migration_pipe_bufflen = read(fd, server.slot_migration_pipe_buff, PROTO_IOBUF_LEN);

        if (server.slot_migration_pipe_bufflen < 0) {
            if (errno == EAGAIN || errno == EWOULDBLOCK) return;
            serverLog(LL_WARNING, "Slot migration, read error sending snapshot to target: %s", strerror(errno));
            client *target = connGetPrivateData(server.slot_migration_pipe_conn);
            freeClient(target);
            server.slot_migration_pipe_conn = NULL;
            return;
        }

        if (server.slot_migration_pipe_bufflen == 0) {
            aeDeleteFileEvent(server.el, server.slot_migration_pipe_read, AE_READABLE);
            close(server.slot_migration_child_exit_pipe);
            server.slot_migration_child_exit_pipe = -1;
            return;
        }

        ssize_t nwritten;
        connection *conn = server.slot_migration_pipe_conn;
        client *target = connGetPrivateData(conn);

        if ((nwritten = connWrite(conn, server.slot_migration_pipe_buff,
                                  server.slot_migration_pipe_bufflen)) == -1) {
            if (connGetState(conn) != CONN_STATE_CONNECTED) {
                serverLog(LL_WARNING, "Slot migration transfer, write error sending DB to target: %s",
                          connGetLastError(conn));
                freeClient(target);
                server.slot_migration_pipe_conn = NULL;
                return;
            }
            target->repl_data->repldboff = 0;
        } else {
            target->repl_data->repldboff = nwritten;
            server.stat_net_cluster_slot_export_bytes += nwritten;
        }

        if (nwritten != server.slot_migration_pipe_bufflen) {
            connSetWriteHandler(conn, slotMigrationPipeWriteHandler);
            aeDeleteFileEvent(server.el, server.slot_migration_pipe_read, AE_READABLE);
            return;
        }

        iter++;  // ⚠️ Increment counter
    }

    // Limit reached but pipe may still have data
    // Event loop will re-trigger this handler in the next iteration
}

Benefits:

  • Limits each event handler invocation to 10 iterations (160KB max)
  • Allows other client requests to be processed between iterations
  • Migration continues in subsequent event loop cycles
  • Does not significantly impact migration speed

Solution 2: Time-Based Limit

void slotMigrationPipeReadHandler(...) {
    long long start = ustime();

    while (1) {
        // ... existing read/write logic ...

        // Exit if processing time exceeds 500μs
        if (ustime() - start > 500) {
            return;  // Yield to other requests
        }
    }
}

Solution 3: Configurable Limit

Add configuration options:

// In valkey.conf
slot-migration-max-reads-per-event 10
slot-migration-max-time-per-event 500  // microseconds

Comparison with Replication Code

Feature Replication Slot Migration
Per-event limit ✅ Max 25 iterations ❌ Unbounded (exits at ~4 but triggers repeatedly)
Total iterations Limited ⚠️ Thousands of times
Starvation protection ✅ Explicit ❌ None
Comment explanation ✅ "Don't starve other clients" ❌ No protection mentioned

Code Locations

  • src/replication.c:1846 - slotMigrationPipeReadHandler() - unbounded loop
  • src/networking.c:4109 - REPL_MAX_READS_PER_IO_EVENT definition
  • src/networking.c:4115 - Comment explaining starvation prevention
  • src/cluster_migrateslots.c:1596 - Pipe creation with O_NONBLOCK

Reproduction Steps

  1. Set up a Valkey cluster with 2+ nodes
  2. Populate a slot with a large dataset (e.g., 10GB)
  3. Start atomic slot migration: CLUSTER MIGRATESLOTS <node-id> <slots>
  4. Monitor client request latency during migration
  5. Observe increased latency for regular client commands

Expected Behavior

Client requests should maintain consistent latency during slot migration.

Actual Behavior

Client request latency increases by 1.4x - 2.8x during large slot migrations due to main thread monopolization.

Workarounds

  • Schedule slot migrations during low-traffic periods
  • Migrate smaller slot ranges
  • Monitor client latency and pause migrations if needed

Additional Context

This issue becomes more severe with:

  • High-speed networks (10Gbps+)
  • Large slot sizes (10GB+)
  • In-memory datasets (faster child process generation)
  • Low-latency applications sensitive to microsecond delays

Request

Could the maintainers consider adding an iteration limit similar to the replication code to prevent client starvation during slot migrations? I'm happy to submit a PR if this approach is acceptable.

Thank you for your time and for maintaining this excellent project!

You must be logged in to vote

Replies: 1 comment

Comment options

Hey @twoholes, thanks for the write-up!

The pipe read handler is used to read out the snapshot contents that is being written by the child process over the wire and to the target node. This code is actually the same as what we do during the replication snapshot here:

valkey/src/replication.c

Lines 1778 to 1787 in a571ac5

/* Called in diskless primary, when there's data to read from the child's rdb pipe */
void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) {
UNUSED(mask);
UNUSED(clientData);
UNUSED(eventLoop);
int i;
if (!server.rdb_pipe_buff) server.rdb_pipe_buff = zmalloc(PROTO_IOBUF_LEN);
serverAssert(server.rdb_pipe_numconns_writing == 0);
while (1) {

We can consider adding a bound to the amount of data we will read out from the pipe (for both replication and slot migration) in one event loop cycle. Although in the fullness of time we should really consolidate on the new dual-channel replication protocol. I just filed #2957 for that. With this model, there wouldn't be any resources on the main thread since the child process would send directly to the target node

Let me know if this would resolve your concerns

You must be logged in to vote
0 replies
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
2 participants
Morty Proxy This is a proxified and sanitized view of the page, visit original site.