|
This version is still in development and is not considered stable yet. For the latest stable version, please use Korvet 0.12.5! |
Consumer Group Management
This page describes how Korvet handles Kafka consumer group operations.
| This page is implementation-oriented and focuses on Kafka-to-Redis mapping details. |
Join Consumer Group (Implicit Group Creation)
Kafka Wire Protocol
JoinGroupRequest {
group_id: "my-group"
session_timeout_ms: 45000
rebalance_timeout_ms: 300000
member_id: "" // empty for first join
group_instance_id: null
protocol_type: "consumer"
protocols: [
{
name: "range" // partition assignment strategy
metadata: {
version: 1
topics: ["orders"]
user_data: null
}
}
]
}
JoinGroupResponse {
throttle_time_ms: 0
error_code: 0
generation_id: 1
protocol_name: "range"
leader: "consumer-1-uuid" // first member becomes leader
member_id: "consumer-1-uuid" // assigned by broker
members: [
{
member_id: "consumer-1-uuid"
group_instance_id: null
metadata: {
version: 1
topics: ["orders"]
user_data: null
}
}
]
}
Redis Commands Detail
JoinGroup is handled by the broker-side group coordinator and consumer-group registry. Redis stream consumer groups are not created during JoinGroup.
Implementation Notes
-
Implicit Creation: Consumer groups are created automatically on first
JoinGrouprequest -
Initial Position: Redis consumer groups are created lazily on the first
Fetchrequest (notJoinGrouporSyncGroup)-
The client determines the initial offset based on:
-
OffsetFetchRequestresult (if group has committed offsets) -
auto.offset.resetconfiguration (if no committed offset exists) -
ListOffsetsAPI (to resolve "earliest" or "latest" to actual offset)
-
-
The client sends this offset in the
fetch_offsetfield of the firstFetchRequest -
The broker creates the Redis consumer group using this offset:
-
XGROUP CREATE
-
-
The broker tracks which groups have been initialized in memory
-
See Consumer Group Workflow and Consumer Group Initialization for complete details
-
-
Member ID Assignment:
-
Generate UUID for new members (empty
member_idin request) -
Reuse existing
member_idfor rejoining members
-
-
Generation ID:
-
Increments on each rebalance
-
Stored in group metadata
-
Used to detect stale member operations
-
-
Leader Election:
-
First member to join becomes leader
-
Leader performs partition assignment
-
Stored in group metadata
-
-
Protocol Selection:
-
Broker selects protocol supported by all members
-
Common protocols: range, roundrobin, sticky
-
-
Rebalance:
-
Triggered when members join/leave
-
All members must rejoin and get new partition assignments
-
-
Heartbeat: Members send periodic heartbeats to maintain membership (separate HeartbeatRequest)
-
Session Timeout: Member removed if no heartbeat within
session_timeout_ms
Delete Consumer Group
Kafka Wire Protocol
DeleteGroupsRequest {
groups_names: ["my-group", "old-group"]
}
DeleteGroupsResponse {
throttle_time_ms: 0
results: [
{
group_id: "my-group"
error_code: 0 // NONE
},
{
group_id: "old-group"
error_code: 69 // GROUP_ID_NOT_FOUND
}
]
}
Redis Commands Detail
DeleteGroups removes broker-side group state, asks the storage layer to delete group artifacts for known stream keys, and deletes explicit committed offsets for those stream keys.
Implementation Notes
-
Group Doesn’t Exist: Return
GROUP_ID_NOT_FOUND(error code 69) -
Active Members:
-
Cannot delete group with active members
-
Return
NON_EMPTY_GROUP(error code 68) -
Check broker in-memory state for active members
-
-
Storage Cleanup:
-
Group deletion delegates per-stream cleanup to the storage layer
-
Explicit committed offsets are also deleted from the committed-offset store
-
-
Cascading Effects:
-
Active consumers will get errors on next operation
-
Consumers should handle
UNKNOWN_MEMBER_IDand rejoin
-
-
Performance: The handler builds a stream-key list from known topics and partitions, then performs storage cleanup asynchronously