Consumer: Fetch Messages

This page describes how Korvet handles Kafka FetchRequest operations.

Consumer Fetches Messages

Kafka Wire Protocol

FetchRequest (API Key: 1)
FetchRequest {
  replica_id: -1             // -1 for consumer (not replica)
  max_wait_ms: 500           // max time to wait if min_bytes not met
  min_bytes: 1               // min bytes to accumulate before responding
  max_bytes: 52428800        // max bytes for entire response (50MB)
  isolation_level: 0         // 0=read_uncommitted, 1=read_committed
  session_id: 0
  session_epoch: -1
  topics: [
    {
      name: "orders"
      partitions: [
        {
          partition: 0
          fetch_offset: 42        // start reading from offset 42
          log_start_offset: 0
          partition_max_bytes: 1048576  // max 1MB per partition
        }
      ]
    }
  ]
  forgotten_topics: []
}
FetchResponse (API Key: 1)
FetchResponse {
  throttle_time_ms: 0
  error_code: 0
  session_id: 0
  topics: [
    {
      name: "orders"
      partitions: [
        {
          partition: 0
          error_code: 0          // NONE
          high_watermark: 44     // next offset to be written
          last_stable_offset: 44
          log_start_offset: 0
          aborted_transactions: []
          preferred_read_replica: -1
          records: [
            {
              offset: 42
              timestamp: 1234567890000
              key: "order-123"
              value: {"product": "widget", "quantity": 5}
              headers: [
                {key: "source", value: "web"},
                {key: "version", value: "1.0"}
              ]
            },
            {
              offset: 43
              timestamp: 1234567891000
              key: "order-124"
              value: {"product": "gadget", "quantity": 3}
              headers: []
            }
          ]
        }
      ]
    }
  ]
}

Sequence Diagram

FetchRequest Sequence

Redis Commands Detail

Step-by-step Redis Operations
# 1. Decode Kafka offset to Redis Stream entry ID
# Given Kafka offset (e.g., 1264197008485888) with sequenceBits=10:
#   timestamp = offset >> 10 = 1234567890123
#   sequence = offset & 1023 = 0
#   entry_id = "1234567890123-0"

# 2. Read messages from stream starting at entry ID
XREAD COUNT 100 STREAMS korvet:orders:0 1234567890123-0
# Returns:
# [
#   ["korvet:orders:0", [
#     ["1234567890123-0",
#      ["key", "order-123",
#       "value", "{\"product\":\"widget\",\"quantity\":5}",
#       "timestamp", "1234567890000",
#       "header.source", "web",
#       "header.version", "1.0"]
#     ],
#     ["1234567890123-1",
#      ["key", "order-124",
#       "value", "{\"product\":\"gadget\",\"quantity\":3}",
#       "timestamp", "1234567891000"]
#     ]
#   ]]
# ]

# 3. Encode each entry ID back to Kafka offset for response
# For "1234567890123-0": offset = (1234567890123 << 10) | 0 = 1264197008485888
# For "1234567890123-1": offset = (1234567890123 << 10) | 1 = 1264197008485889

Partition Metadata Fields

Each partition in the FetchResponse includes critical metadata that consumers use for offset management and monitoring:

high_watermark

The high watermark (HWM) is the offset of the next message that will be written to the partition. In other words, it’s one greater than the offset of the last message in the partition.

Calculation
# Get stream info including first and last entry IDs
XINFO STREAM korvet:orders:0
# Returns stream metadata including:
#   "last-entry" -> ["1234567890123-5", [...]]

# Convert last entry ID to Kafka offset and add 1
# If last entry ID is "1234567890123-5":
#   last_offset = (1234567890123 << 10) | 5 = 1264197008485893
#   high_watermark = 1264197008485893 + 1 = 1264197008485894
Consumer Usage
  • Consumers use HWM to determine if they’ve reached the end of the partition

  • Lag calculation: lag = high_watermark - consumer_position

  • The endOffsets() API returns the high watermark for each partition

log_start_offset

The log start offset is the offset of the first available message in the partition. This may be greater than 0 if messages have been deleted due to retention policies or manual trimming.

Calculation
# Get stream info including first and last entry IDs
XINFO STREAM korvet:orders:0
# Returns stream metadata including:
#   "first-entry" -> ["1234567890100-0", [...]]

# Convert first entry ID to Kafka offset
# If first entry ID is "1234567890100-0":
#   log_start_offset = (1234567890100 << 10) | 0 = 1264197008486400
Consumer Usage
  • Consumers seeking to "earliest" will start from this offset

  • If a consumer requests an offset below log_start_offset, Korvet returns OFFSET_OUT_OF_RANGE error

  • Monitoring tools use this to detect message deletion/trimming

last_stable_offset

The last stable offset (LSO) is the offset up to which all transactions have been completed. Since Korvet does not support transactions, LSO always equals the high watermark.

last_stable_offset = high_watermark  // No transaction support

Implementation Notes

  • Empty Response: If no messages available and max_wait_ms > 0, wait up to max_wait_ms for new messages

  • Offset Decoding: Stateless conversion from Kafka offset to Stream entry ID using bit operations

  • Max Bytes Handling:

    • Accumulate records until partition_max_bytes reached

    • Always return at least one record (even if exceeds max_bytes)

  • Compression: Apply compression based on topic’s compression.type config before sending response

  • Metadata Caching: Consider caching log_start_offset and high_watermark values briefly (e.g., 1 second) to avoid querying Redis on every fetch

  • Performance: XINFO STREAM and XREAD/XREADGROUP commands are batched in parallel to minimize Redis round trips

Kafka FetchRequest
FetchRequest {
  topics: [
    {
      name: "orders"
      partitions: [
        { index: 0, fetchOffset: 5, maxBytes: 1048576 }
      ]
    }
  ]
  maxWaitMs: 500
  minBytes: 1
}
Redis Commands
# Standalone consumer: Decode Kafka offset to entry ID
#   timestamp = 5 >> 10 = 0
#   sequence = 5 & 1023 = 5
#   entry_id = "0-5"
XREAD COUNT 100 STREAMS korvet:orders:0 0-5

# Consumer group: ALWAYS use > (messages never delivered to this group)
# The starting position was set on first Fetch (XGROUP CREATE with fetch_offset)
# Redis tracks the group's position internally via last-delivered-id
XREADGROUP GROUP my-group consumer-1 COUNT 100 STREAMS korvet:orders:0 >

# Note: The fetch_offset in FetchRequest is used ONLY on first Fetch to initialize group
# After initialization, consumer groups always read from where the group left off (tracked by Redis)
Response Mapping
  1. Redis returns entries with IDs and field-value pairs

  2. Convert each entry to Kafka record format

  3. Encode each entry ID to Kafka offset: offset = (timestamp << sequenceBits) | sequence

  4. Build FetchResponse with records, partition metadata, and high watermark