Consumer: Commit Offsets
This page describes how Korvet handles Kafka OffsetCommitRequest operations.
Consumer Commits Offset
Kafka Wire Protocol
OffsetCommitRequest (API Key: 8)
OffsetCommitRequest {
group_id: "my-group"
generation_id: 1 // from JoinGroup response
member_id: "consumer-1-uuid"
group_instance_id: null
retention_time_ms: -1 // -1 = use broker default
topics: [
{
name: "orders"
partitions: [
{
partition: 0
committed_offset: 44 // next offset to read
committed_leader_epoch: -1
committed_metadata: "processed batch 123"
}
]
}
]
}
OffsetCommitResponse (API Key: 8)
OffsetCommitResponse {
throttle_time_ms: 0
topics: [
{
name: "orders"
partitions: [
{
partition: 0
error_code: 0 // NONE
}
]
}
]
}
Redis Commands Detail
Step-by-step Redis Operations
# 1. Decode Kafka offset to entry ID
# Given committed_offset = 1264197008485889 with sequenceBits=10:
# This is the NEXT offset to read
# Last consumed offset = 1264197008485888
# timestamp = 1264197008485888 >> 10 = 1234567890123
# sequence = 1264197008485888 & 1023 = 0
# entry_id = "1234567890123-0"
# 2. Acknowledge message in Redis consumer group
XACK korvet:orders:0 my-group "1234567890123-0"
# Returns: 1 (number of messages acknowledged)
# Note: Redis Streams consumer groups track committed position internally
# No need to store offsets separately - XACK is the commit
Implementation Notes
-
Offset Semantics: Kafka commits the next offset to read, but Redis XACK requires the last consumed entry ID
-
Committed offset N means "next read from N"
-
Must XACK entry at offset N-1 (last consumed)
-
Decode offset N-1 to get entry ID for XACK
-
-
Consumer Group Validation:
-
Verify
member_idis active member of group (in-memory broker state) -
Return
UNKNOWN_MEMBER_ID(error code 25) if not found -
Return
ILLEGAL_GENERATION(error code 22) if generation_id mismatch
-
-
Batch Acknowledgment: Can XACK multiple entry IDs in single command for efficiency