List Offsets (Find Beginning/End)

This page describes how Korvet handles Kafka ListOffsetsRequest operations.

List Offsets (Find Beginning/End)

Kafka Wire Protocol

ListOffsetsRequest (API Key: 2)
ListOffsetsRequest {
  replica_id: -1             // -1 for client (not replica)
  isolation_level: 0         // 0=read_uncommitted, 1=read_committed
  topics: [
    {
      name: "orders"
      partitions: [
        {
          partition: 0
          timestamp: -2        // -2 = earliest, -1 = latest
          current_leader_epoch: -1
        },
        {
          partition: 1
          timestamp: -1        // latest
          current_leader_epoch: -1
        },
        {
          partition: 2
          timestamp: 1234567890000  // specific timestamp
          current_leader_epoch: -1
        }
      ]
    }
  ]
}
ListOffsetsResponse (API Key: 2)
ListOffsetsResponse {
  throttle_time_ms: 0
  topics: [
    {
      name: "orders"
      partitions: [
        {
          partition: 0
          error_code: 0
          timestamp: 1234567890000    // timestamp of earliest message
          offset: 0                   // earliest offset
          leader_epoch: -1
        },
        {
          partition: 1
          error_code: 0
          timestamp: -1               // -1 for latest (no message)
          offset: 44                  // next offset to be written
          leader_epoch: -1
        },
        {
          partition: 2
          error_code: 0
          timestamp: 1234567890500    // actual timestamp found
          offset: 25                  // offset at/after requested timestamp
          leader_epoch: -1
        }
      ]
    }
  ]
}

Sequence Diagram

ListOffsetsRequest Sequence

Redis Commands Detail

Earliest Offset (timestamp = -2)
# 1. Get stream info to find first entry
XINFO STREAM korvet:orders:0
# Returns stream metadata including:
# - first-entry: ["1234567890123-0", ["key", "order-123", "value", "...", "timestamp", "1234567890000"]]
# - last-entry: ["1234567890999-5", [...]]
# - length: 1000

# 2. Encode first entry ID to Kafka offset
# For "1234567890123-0" with sequenceBits=10:
#   offset = (1234567890123 << 10) | 0 = 1264197008485888

# 3. Extract timestamp from first entry fields
# From entry: "timestamp" field = "1234567890000"
Latest Offset (timestamp = -1)
# Get stream info to find last entry
XINFO STREAM korvet:orders:0
# Returns stream metadata including:
# - last-entry: ["1234567890999-5", [...]]

# Encode last entry ID to Kafka offset
# For "1234567890999-5" with sequenceBits=10:
#   offset = (1234567890999 << 10) | 5

# Timestamp is -1 (convention for "latest")
Specific Timestamp (timestamp = 1234567890000)
# 1. Convert timestamp to Redis Stream ID format (timestamp in ms)
# Kafka timestamp: 1234567890000
# Redis Stream ID: 1234567890000-0 (timestamp-sequence)

# 2. Find first entry at or after timestamp
XREAD COUNT 1 STREAMS korvet:orders:2 1234567890000-0
# Returns: [["korvet:orders:2", [["1234567890500-0", ["key", "...", "timestamp", "1234567890500"]]]]]

# 3. Encode entry ID to Kafka offset
# For "1234567890500-0" with sequenceBits=10:
#   offset = (1234567890500 << 10) | 0

# 4. Extract timestamp from entry
# From entry: "timestamp" field = "1234567890500"

# Alternative: If no entry found at/after timestamp
XREAD COUNT 1 STREAMS korvet:orders:2 1234567890000-0
# Returns: [] (empty) or [["korvet:orders:2", []]]
# Response: offset = 0, timestamp = -1

Implementation Notes

  • Earliest Offset (timestamp = -2):

    • Use XINFO STREAM to get first-entry from stream metadata

    • Encode entry ID to Kafka offset

    • Return timestamp from entry’s timestamp field

    • If stream empty or doesn’t exist: return offset 0, timestamp -1

  • Latest Offset (timestamp = -1):

    • Use XINFO STREAM to get last-entry from stream metadata

    • Encode entry ID to Kafka offset

    • Timestamp is always -1 (convention for "latest")

  • Specific Timestamp:

    • Convert Kafka timestamp (ms) to Redis Stream ID format: {timestamp}-0

    • Use XREAD COUNT 1 STREAMS stream {timestamp}-0 to find first entry at/after timestamp

    • If no entry found: return offset 0 with timestamp -1

    • Extract actual timestamp from entry fields

  • Empty Partition:

    • XINFO STREAM returns error if stream doesn’t exist

    • Return: offset 0, timestamp -1

  • Performance: XINFO STREAM is O(1) for finding earliest/latest offsets