List Offsets (Find Beginning/End)
This page describes how Korvet handles Kafka ListOffsetsRequest operations.
List Offsets (Find Beginning/End)
Kafka Wire Protocol
ListOffsetsRequest {
replica_id: -1 // -1 for client (not replica)
isolation_level: 0 // 0=read_uncommitted, 1=read_committed
topics: [
{
name: "orders"
partitions: [
{
partition: 0
timestamp: -2 // -2 = earliest, -1 = latest
current_leader_epoch: -1
},
{
partition: 1
timestamp: -1 // latest
current_leader_epoch: -1
},
{
partition: 2
timestamp: 1234567890000 // specific timestamp
current_leader_epoch: -1
}
]
}
]
}
ListOffsetsResponse {
throttle_time_ms: 0
topics: [
{
name: "orders"
partitions: [
{
partition: 0
error_code: 0
timestamp: 1234567890000 // timestamp of earliest message
offset: 0 // earliest offset
leader_epoch: -1
},
{
partition: 1
error_code: 0
timestamp: -1 // -1 for latest (no message)
offset: 44 // next offset to be written
leader_epoch: -1
},
{
partition: 2
error_code: 0
timestamp: 1234567890500 // actual timestamp found
offset: 25 // offset at/after requested timestamp
leader_epoch: -1
}
]
}
]
}
Redis Commands Detail
# 1. Get stream info to find first entry
XINFO STREAM korvet:orders:0
# Returns stream metadata including:
# - first-entry: ["1234567890123-0", ["key", "order-123", "value", "...", "timestamp", "1234567890000"]]
# - last-entry: ["1234567890999-5", [...]]
# - length: 1000
# 2. Encode first entry ID to Kafka offset
# For "1234567890123-0" with sequenceBits=10:
# offset = (1234567890123 << 10) | 0 = 1264197008485888
# 3. Extract timestamp from first entry fields
# From entry: "timestamp" field = "1234567890000"
# Get stream info to find last entry
XINFO STREAM korvet:orders:0
# Returns stream metadata including:
# - last-entry: ["1234567890999-5", [...]]
# Encode last entry ID to Kafka offset
# For "1234567890999-5" with sequenceBits=10:
# offset = (1234567890999 << 10) | 5
# Timestamp is -1 (convention for "latest")
# 1. Convert timestamp to Redis Stream ID format (timestamp in ms)
# Kafka timestamp: 1234567890000
# Redis Stream ID: 1234567890000-0 (timestamp-sequence)
# 2. Find first entry at or after timestamp
XREAD COUNT 1 STREAMS korvet:orders:2 1234567890000-0
# Returns: [["korvet:orders:2", [["1234567890500-0", ["key", "...", "timestamp", "1234567890500"]]]]]
# 3. Encode entry ID to Kafka offset
# For "1234567890500-0" with sequenceBits=10:
# offset = (1234567890500 << 10) | 0
# 4. Extract timestamp from entry
# From entry: "timestamp" field = "1234567890500"
# Alternative: If no entry found at/after timestamp
XREAD COUNT 1 STREAMS korvet:orders:2 1234567890000-0
# Returns: [] (empty) or [["korvet:orders:2", []]]
# Response: offset = 0, timestamp = -1
Implementation Notes
-
Earliest Offset (
timestamp = -2):-
Use
XINFO STREAMto get first-entry from stream metadata -
Encode entry ID to Kafka offset
-
Return timestamp from entry’s timestamp field
-
If stream empty or doesn’t exist: return offset 0, timestamp -1
-
-
Latest Offset (
timestamp = -1):-
Use
XINFO STREAMto get last-entry from stream metadata -
Encode entry ID to Kafka offset
-
Timestamp is always -1 (convention for "latest")
-
-
Specific Timestamp:
-
Convert Kafka timestamp (ms) to Redis Stream ID format:
{timestamp}-0 -
Use
XREAD COUNT 1 STREAMS stream {timestamp}-0to find first entry at/after timestamp -
If no entry found: return offset 0 with timestamp -1
-
Extract actual timestamp from entry fields
-
-
Empty Partition:
-
XINFO STREAMreturns error if stream doesn’t exist -
Return: offset 0, timestamp -1
-
-
Performance:
XINFO STREAMis O(1) for finding earliest/latest offsets