Consumer Group Initialization State Management

This document describes how Korvet tracks which consumer groups have been initialized in Redis Streams.

Problem Statement

Redis Streams consumer groups must be created with an initial offset using XGROUP CREATE. However, the broker doesn’t know what offset to use until the client sends its first FetchRequest.

The client determines the initial offset based on:

  1. Committed offsets (from OffsetFetchRequest)

  2. auto.offset.reset configuration (if no committed offset exists)

  3. ListOffsets API (to resolve "earliest" or "latest" to actual offset)

The client sends this offset in the fetch_offset field of the first FetchRequest.

Design Decision

Redis consumer groups are created lazily on the first Fetch request, not during SyncGroup or OffsetFetch.

This approach:

  • ✅ Allows the client to control the initial offset

  • ✅ Avoids hardcoding offset assumptions in the broker

  • ✅ Works correctly with auto.offset.reset=earliest and auto.offset.reset=latest

  • ✅ Handles committed offsets correctly

Implementation

Broker State Tracking

The broker maintains in-memory state to track which consumer groups have been initialized:

// In ConsumerGroupRegistry
private final Set<String> initializedGroups = ConcurrentHashMap.newKeySet();

public boolean isGroupInitialized(String groupId, String topic, int partition) {
    String key = groupKey(groupId, topic, partition);
    return initializedGroups.contains(key);
}

public void markGroupInitialized(String groupId, String topic, int partition) {
    String key = groupKey(groupId, topic, partition);
    initializedGroups.add(key);
}

private String groupKey(String groupId, String topic, int partition) {
    return groupId + ":" + topic + ":" + partition;
}

First Fetch Request

When FetchHandler receives a Fetch request for a consumer group:

// Check if group is initialized
if (!consumerGroupRegistry.isGroupInitialized(groupId, topic, partition)) {
    // Decode fetch_offset to Redis Stream entry ID
    String entryId = offsetMapper.toEntryId(fetchOffset);

    // Create Redis consumer group
    try {
        commands.xgroupCreate(
            XReadArgs.StreamOffset.from(streamKey, entryId),
            groupId,
            XGroupCreateArgs.Builder.mkstream()
        );
    } catch (RedisBusyException e) {
        // Group already exists (e.g., after broker restart)
        // The existing group position is preserved
        log.debug("Consumer group already exists, using existing position");
    }

    // Mark as initialized
    consumerGroupRegistry.markGroupInitialized(groupId, topic, partition);
}

// Now read using XREADGROUP with >
StreamMessage<String, byte[]> messages = commands.xreadgroup(
    Consumer.from(groupId, consumerId),
    XReadArgs.StreamOffset.lastConsumed(streamKey)
);

Subsequent Fetch Requests

After initialization, all Fetch requests use XREADGROUP with >:

// Group is already initialized - just read
StreamMessage<String, byte[]> messages = commands.xreadgroup(
    Consumer.from(groupId, consumerId),
    XReadArgs.StreamOffset.lastConsumed(streamKey)  // Uses >
);

State Persistence

The initializedGroups set is not persisted to Redis. It’s purely in-memory state.

Broker Restart Behavior

When the broker restarts:

  1. The initializedGroups set is empty

  2. On the first Fetch after restart, the broker attempts XGROUP CREATE

  3. If the group already exists in Redis, XGROUP CREATE fails with RedisBusyException

  4. The broker catches this exception and preserves the existing group position

  5. The broker marks the group as initialized in memory

This ensures correctness even after broker restarts - the consumer group continues from where it left off.

Alternative Approaches Considered

Option 1: Check Redis on Every Fetch

Query Redis to check if the consumer group exists before each XREADGROUP:

XINFO GROUPS stream-key

Rejected: Adds an extra Redis round-trip on every Fetch request, which is unacceptable for performance.

Option 2: Create Groups During SyncGroup

Create Redis consumer groups during SyncGroup with a hardcoded offset (e.g., "0" or "$"):

// In SyncGroupHandler
commands.xgroupCreate(streamOffset, groupId, XGroupCreateArgs.Builder.mkstream());

Rejected: The broker doesn’t know what offset to use. The client’s auto.offset.reset configuration is not sent to the broker.

Option 3: Lazy Initialization with Error Handling

Try XREADGROUP first, and if it fails with "NOGROUP", create the group:

try {
    messages = commands.xreadgroup(...);
} catch (NoGroupException e) {
    commands.xgroupCreate(...);
    messages = commands.xreadgroup(...);
}

Rejected: Same problem as Option 2 - we don’t know what offset to use when creating the group.