Consumer: Fetch Committed Offsets

This page describes how Korvet handles Kafka OffsetFetchRequest operations.

Consumer Fetches Committed Offset

Kafka Wire Protocol

OffsetFetchRequest (API Key: 9)
OffsetFetchRequest {
  group_id: "my-group"
  topics: [
    {
      name: "orders"
      partitions: [0, 1, 2]
    },
    {
      name: "payments"
      partitions: [0]
    }
  ]
  require_stable: false
}
OffsetFetchResponse (API Key: 9)
OffsetFetchResponse {
  throttle_time_ms: 0
  topics: [
    {
      name: "orders"
      partitions: [
        {
          partition: 0
          committed_offset: 44
          committed_leader_epoch: -1
          metadata: "processed batch 123"
          error_code: 0
        },
        {
          partition: 1
          committed_offset: 20
          committed_leader_epoch: -1
          metadata: ""
          error_code: 0
        },
        {
          partition: 2
          committed_offset: -1        // no offset committed
          committed_leader_epoch: -1
          metadata: ""
          error_code: 0
        }
      ]
    },
    {
      name: "payments"
      partitions: [
        {
          partition: 0
          committed_offset: 100
          committed_leader_epoch: -1
          metadata: ""
          error_code: 0
        }
      ]
    }
  ]
  error_code: 0
}

Sequence Diagram

OffsetFetchRequest Sequence

Redis Commands Detail

Step-by-step Redis Operations
# 1. Query Redis Streams consumer group for last acknowledged position
XINFO GROUPS korvet:orders:0
# Returns group info including last-delivered-id for "my-group"

# 2. Get pending entries count (messages delivered but not acknowledged)
XPENDING korvet:orders:0 my-group - + 1
# Returns info about oldest pending message

# 3. Encode last acknowledged entry ID to Kafka offset
# If last-delivered-id = "1234567890123-5" with sequenceBits=10:
#   offset = (1234567890123 << 10) | 5 = 1264197008485893
#   committed_offset = offset + 1 = 1264197008485894 (next to read)

# Note: Redis Streams consumer groups track position internally via XACK
# No separate offset storage needed - XINFO GROUPS provides the committed position

Implementation Notes

  • No Offset Committed: Return offset -1 when partition has no committed offset

  • Group Doesn’t Exist:

    • Return offset -1 for all partitions

    • Set error_code: 0 (not an error in Kafka semantics)

    • Use XINFO GROUPS to check if consumer group exists on stream

  • Invalid Partition: Return UNKNOWN_TOPIC_OR_PARTITION (error code 3) for non-existent partitions

  • Position Tracking: Redis Streams consumer groups track last-delivered-id automatically via XACK

  • Encoding: Convert Redis Stream entry ID to Kafka offset using stateless encoding

  • Coordinator Check: Verify this broker is the group coordinator (based on group_id hash)