Consumer: Fetch Committed Offsets
This page describes how Korvet handles Kafka OffsetFetchRequest operations.
Consumer Fetches Committed Offset
Kafka Wire Protocol
OffsetFetchRequest (API Key: 9)
OffsetFetchRequest {
group_id: "my-group"
topics: [
{
name: "orders"
partitions: [0, 1, 2]
},
{
name: "payments"
partitions: [0]
}
]
require_stable: false
}
OffsetFetchResponse (API Key: 9)
OffsetFetchResponse {
throttle_time_ms: 0
topics: [
{
name: "orders"
partitions: [
{
partition: 0
committed_offset: 44
committed_leader_epoch: -1
metadata: "processed batch 123"
error_code: 0
},
{
partition: 1
committed_offset: 20
committed_leader_epoch: -1
metadata: ""
error_code: 0
},
{
partition: 2
committed_offset: -1 // no offset committed
committed_leader_epoch: -1
metadata: ""
error_code: 0
}
]
},
{
name: "payments"
partitions: [
{
partition: 0
committed_offset: 100
committed_leader_epoch: -1
metadata: ""
error_code: 0
}
]
}
]
error_code: 0
}
Redis Commands Detail
Step-by-step Redis Operations
# 1. Query Redis Streams consumer group for last acknowledged position
XINFO GROUPS korvet:orders:0
# Returns group info including last-delivered-id for "my-group"
# 2. Get pending entries count (messages delivered but not acknowledged)
XPENDING korvet:orders:0 my-group - + 1
# Returns info about oldest pending message
# 3. Encode last acknowledged entry ID to Kafka offset
# If last-delivered-id = "1234567890123-5" with sequenceBits=10:
# offset = (1234567890123 << 10) | 5 = 1264197008485893
# committed_offset = offset + 1 = 1264197008485894 (next to read)
# Note: Redis Streams consumer groups track position internally via XACK
# No separate offset storage needed - XINFO GROUPS provides the committed position
Implementation Notes
-
No Offset Committed: Return offset
-1when partition has no committed offset -
Group Doesn’t Exist:
-
Return offset
-1for all partitions -
Set
error_code: 0(not an error in Kafka semantics) -
Use
XINFO GROUPSto check if consumer group exists on stream
-
-
Invalid Partition: Return
UNKNOWN_TOPIC_OR_PARTITION(error code 3) for non-existent partitions -
Position Tracking: Redis Streams consumer groups track last-delivered-id automatically via XACK
-
Encoding: Convert Redis Stream entry ID to Kafka offset using stateless encoding
-
Coordinator Check: Verify this broker is the group coordinator (based on group_id hash)