This version is still in development and is not considered stable yet. For the latest stable version, please use Korvet 0.12.5!

Consumer: Fetch Messages

This page describes how Korvet handles Kafka FetchRequest operations.

This page is implementation-oriented and focuses on Kafka-to-Redis mapping details.

Consumer Fetches Messages

Kafka Wire Protocol

FetchRequest (API Key: 1)
FetchRequest {
  replica_id: -1             // -1 for consumer (not replica)
  max_wait_ms: 500           // max time to wait if min_bytes not met
  min_bytes: 1               // min bytes to accumulate before responding
  max_bytes: 52428800        // max bytes for entire response (50MB)
  isolation_level: 0         // 0=read_uncommitted, 1=read_committed
  session_id: 0
  session_epoch: -1
  topics: [
    {
      name: "orders"
      partitions: [
        {
          partition: 0
          fetch_offset: 42        // start reading from offset 42
          log_start_offset: 0
          partition_max_bytes: 1048576  // max 1MB per partition
        }
      ]
    }
  ]
  forgotten_topics: []
}
FetchResponse (API Key: 1)
FetchResponse {
  throttle_time_ms: 0
  error_code: 0
  session_id: 0
  topics: [
    {
      name: "orders"
      partitions: [
        {
          partition: 0
          error_code: 0          // NONE
          high_watermark: 44     // next offset to be written
          last_stable_offset: 44
          log_start_offset: 0
          aborted_transactions: []
          preferred_read_replica: -1
          records: [
            {
              offset: 42
              timestamp: 1234567890000
              key: "order-123"
              value: {"product": "widget", "quantity": 5}
              headers: [
                {key: "source", value: "web"},
                {key: "version", value: "1.0"}
              ]
            },
            {
              offset: 43
              timestamp: 1234567891000
              key: "order-124"
              value: {"product": "gadget", "quantity": 3}
              headers: []
            }
          ]
        }
      ]
    }
  ]
}

Sequence Diagram

FetchRequest Sequence

Redis Commands Detail

Standalone fetches convert fetch_offset to an exclusive Redis start ID and read with XREAD.

Consumer-group fetches resolve the member and group from broker-side registry state, lazily create Redis consumer groups for first fetches, then read with XREADGROUP …​ >.

Partition Metadata Fields

Each partition in the FetchResponse includes critical metadata that consumers use for offset management and monitoring:

high_watermark

The high watermark (HWM) is the offset of the next message that will be written to the partition. In other words, it’s one greater than the offset of the last message in the partition.

Calculation
# Get stream info including first and last entry IDs
XINFO STREAM korvet:orders:0
# Returns stream metadata including:
#   "last-entry" -> ["1234567890123-5", [...]]

# Convert last entry ID to Kafka offset and add 1
# If last entry ID is "1234567890123-5":
#   last_offset = (1234567890123 << 10) | 5 = 1264197008485893
#   high_watermark = 1264197008485893 + 1 = 1264197008485894
Consumer Usage
  • Consumers use HWM to determine if they’ve reached the end of the partition

  • Lag calculation: lag = high_watermark - consumer_position

  • The endOffsets() API returns the high watermark for each partition

log_start_offset

The log start offset is the offset of the first available message in the partition. This may be greater than 0 if messages have been deleted due to retention policies or manual trimming.

Calculation
# Get stream info including first and last entry IDs
XINFO STREAM korvet:orders:0
# Returns stream metadata including:
#   "first-entry" -> ["1234567890100-0", [...]]

# Convert first entry ID to Kafka offset
# If first entry ID is "1234567890100-0":
#   log_start_offset = (1234567890100 << 10) | 0 = 1264197008486400
Consumer Usage
  • Consumers seeking to "earliest" will start from this offset

  • If a consumer requests an offset below log_start_offset, Korvet returns OFFSET_OUT_OF_RANGE error

  • Monitoring tools use this to detect message deletion/trimming

last_stable_offset

The last stable offset (LSO) is the offset up to which all transactions have been completed. Since Korvet does not support transactions, LSO always equals the high watermark.

last_stable_offset = high_watermark  // No transaction support

Implementation Notes

  • Empty Response: If no messages available and max_wait_ms > 0, wait up to max_wait_ms for new messages

  • Offset Decoding: Stateless conversion from Kafka offset to Stream entry ID using bit operations

  • Unknown Topics: Unknown topics are filtered before read execution and returned as UNKNOWN_TOPIC_OR_PARTITION.

  • High Watermark / Log Start: The broker derives high_watermark and log_start_offset from stream metadata after the read.

  • Group Initialization: First group fetches create Redis groups using a start ID derived from fetch_offset, not a fixed 0 or $ position.

  • Delivered Message Tracking: Group fetches track delivered Redis message IDs in broker memory so later offset commits can acknowledge them.

  • Performance: Reads and metadata lookups are batched, and blocking group fetches may be sliced into shorter polling windows.

Kafka FetchRequest
FetchRequest {
  topics: [
    {
      name: "orders"
      partitions: [
        { index: 0, fetchOffset: 5, maxBytes: 1048576 }
      ]
    }
  ]
  maxWaitMs: 500
  minBytes: 1
}
Redis Commands
XREAD COUNT <n> STREAMS korvet:orders:0 <exclusive-start-id>
XGROUP CREATE korvet:orders:0 my-group <start-id> MKSTREAM
XREADGROUP GROUP my-group consumer-1 COUNT <n> STREAMS korvet:orders:0 >
Response Mapping
  1. Redis returns entries with IDs and field-value pairs

  2. Convert each entry to Kafka record format

  3. Encode each entry ID to Kafka offset: offset = (timestamp << sequenceBits) | sequence

  4. Build FetchResponse with records, partition metadata, and high watermark