Consumer: Commit Offsets

This page describes how Korvet handles Kafka OffsetCommitRequest operations.

Consumer Commits Offset

Kafka Wire Protocol

OffsetCommitRequest (API Key: 8)
OffsetCommitRequest {
  group_id: "my-group"
  generation_id: 1           // from JoinGroup response
  member_id: "consumer-1-uuid"
  group_instance_id: null
  retention_time_ms: -1      // -1 = use broker default
  topics: [
    {
      name: "orders"
      partitions: [
        {
          partition: 0
          committed_offset: 44      // next offset to read
          committed_leader_epoch: -1
          committed_metadata: "processed batch 123"
        }
      ]
    }
  ]
}
OffsetCommitResponse (API Key: 8)
OffsetCommitResponse {
  throttle_time_ms: 0
  topics: [
    {
      name: "orders"
      partitions: [
        {
          partition: 0
          error_code: 0          // NONE
        }
      ]
    }
  ]
}

Sequence Diagram

OffsetCommitRequest Sequence

Redis Commands Detail

Step-by-step Redis Operations
# 1. Decode Kafka offset to entry ID
# Given committed_offset = 1264197008485889 with sequenceBits=10:
#   This is the NEXT offset to read
#   Last consumed offset = 1264197008485888
#   timestamp = 1264197008485888 >> 10 = 1234567890123
#   sequence = 1264197008485888 & 1023 = 0
#   entry_id = "1234567890123-0"

# 2. Acknowledge message in Redis consumer group
XACK korvet:orders:0 my-group "1234567890123-0"
# Returns: 1 (number of messages acknowledged)

# Note: Redis Streams consumer groups track committed position internally
# No need to store offsets separately - XACK is the commit

Implementation Notes

  • Offset Semantics: Kafka commits the next offset to read, but Redis XACK requires the last consumed entry ID

    • Committed offset N means "next read from N"

    • Must XACK entry at offset N-1 (last consumed)

    • Decode offset N-1 to get entry ID for XACK

  • Consumer Group Validation:

    • Verify member_id is active member of group (in-memory broker state)

    • Return UNKNOWN_MEMBER_ID (error code 25) if not found

    • Return ILLEGAL_GENERATION (error code 22) if generation_id mismatch

  • Batch Acknowledgment: Can XACK multiple entry IDs in single command for efficiency