Consumer Group Initialization State Management
This document describes how Korvet tracks which consumer groups have been initialized in Redis Streams.
Problem Statement
Redis Streams consumer groups must be created with an initial offset using XGROUP CREATE. However, the broker doesn’t know what offset to use until the client sends its first FetchRequest.
The client determines the initial offset based on:
-
Committed offsets (from
OffsetFetchRequest) -
auto.offset.reset configuration (if no committed offset exists)
-
ListOffsets API (to resolve "earliest" or "latest" to actual offset)
The client sends this offset in the fetch_offset field of the first FetchRequest.
Design Decision
Redis consumer groups are created lazily on the first Fetch request, not during SyncGroup or OffsetFetch.
This approach:
-
✅ Allows the client to control the initial offset
-
✅ Avoids hardcoding offset assumptions in the broker
-
✅ Works correctly with
auto.offset.reset=earliestandauto.offset.reset=latest -
✅ Handles committed offsets correctly
Implementation
Broker State Tracking
The broker maintains in-memory state to track which consumer groups have been initialized:
// In ConsumerGroupRegistry
private final Set<String> initializedGroups = ConcurrentHashMap.newKeySet();
public boolean isGroupInitialized(String groupId, String topic, int partition) {
String key = groupKey(groupId, topic, partition);
return initializedGroups.contains(key);
}
public void markGroupInitialized(String groupId, String topic, int partition) {
String key = groupKey(groupId, topic, partition);
initializedGroups.add(key);
}
private String groupKey(String groupId, String topic, int partition) {
return groupId + ":" + topic + ":" + partition;
}
First Fetch Request
When FetchHandler receives a Fetch request for a consumer group:
// Check if group is initialized
if (!consumerGroupRegistry.isGroupInitialized(groupId, topic, partition)) {
// Decode fetch_offset to Redis Stream entry ID
String entryId = offsetMapper.toEntryId(fetchOffset);
// Create Redis consumer group
try {
commands.xgroupCreate(
XReadArgs.StreamOffset.from(streamKey, entryId),
groupId,
XGroupCreateArgs.Builder.mkstream()
);
} catch (RedisBusyException e) {
// Group already exists (e.g., after broker restart)
// The existing group position is preserved
log.debug("Consumer group already exists, using existing position");
}
// Mark as initialized
consumerGroupRegistry.markGroupInitialized(groupId, topic, partition);
}
// Now read using XREADGROUP with >
StreamMessage<String, byte[]> messages = commands.xreadgroup(
Consumer.from(groupId, consumerId),
XReadArgs.StreamOffset.lastConsumed(streamKey)
);
State Persistence
The initializedGroups set is not persisted to Redis. It’s purely in-memory state.
Broker Restart Behavior
When the broker restarts:
-
The
initializedGroupsset is empty -
On the first Fetch after restart, the broker attempts
XGROUP CREATE -
If the group already exists in Redis,
XGROUP CREATEfails withRedisBusyException -
The broker catches this exception and preserves the existing group position
-
The broker marks the group as initialized in memory
This ensures correctness even after broker restarts - the consumer group continues from where it left off.
Alternative Approaches Considered
Option 1: Check Redis on Every Fetch
Query Redis to check if the consumer group exists before each XREADGROUP:
XINFO GROUPS stream-key
Rejected: Adds an extra Redis round-trip on every Fetch request, which is unacceptable for performance.
Option 2: Create Groups During SyncGroup
Create Redis consumer groups during SyncGroup with a hardcoded offset (e.g., "0" or "$"):
// In SyncGroupHandler
commands.xgroupCreate(streamOffset, groupId, XGroupCreateArgs.Builder.mkstream());
Rejected: The broker doesn’t know what offset to use. The client’s auto.offset.reset configuration is not sent to the broker.
Option 3: Lazy Initialization with Error Handling
Try XREADGROUP first, and if it fails with "NOGROUP", create the group:
try {
messages = commands.xreadgroup(...);
} catch (NoGroupException e) {
commands.xgroupCreate(...);
messages = commands.xreadgroup(...);
}
Rejected: Same problem as Option 2 - we don’t know what offset to use when creating the group.