Producer: Send Messages
This page describes how Korvet handles Kafka ProduceRequest operations.
Producer Sends Messages
Kafka Wire Protocol
ProduceRequest {
transactional_id: null
acks: 1 // 0=no ack, 1=leader ack, -1=all replicas
timeout_ms: 30000
topics: [
{
name: "orders"
partitions: [
{
index: 0
records: [
{
offset: null // assigned by broker
timestamp: 1234567890000
key: "order-123"
value: {"product": "widget", "quantity": 5}
headers: [
{key: "source", value: "web"},
{key: "version", value: "1.0"}
]
},
{
offset: null
timestamp: 1234567891000
key: "order-124"
value: {"product": "gadget", "quantity": 3}
headers: []
}
]
}
]
}
]
}
ProduceResponse {
topics: [
{
name: "orders"
partitions: [
{
index: 0
error_code: 0 // NONE
base_offset: 42 // first offset assigned
log_append_time: 1234567890123
log_start_offset: 0
}
]
}
]
throttle_time_ms: 0
}
Redis Commands Detail
# 1. Add first message to stream
XADD korvet:orders:0 * \
key "order-123" \
value "{\"product\":\"widget\",\"quantity\":5}" \
timestamp "1234567890000" \
header.source "web" \
header.version "1.0"
# Returns: "1234567890123-0"
# 2. Add second message to stream
XADD korvet:orders:0 * \
key "order-124" \
value "{\"product\":\"gadget\",\"quantity\":3}" \
timestamp "1234567891000"
# Returns: "1234567890123-1"
# Kafka offsets are computed from entry IDs:
# offset = (timestamp << sequenceBits) | sequence
#
# For entry "1234567890123-0" with sequenceBits=10:
# offset = (1234567890123 << 10) + 0 = 1264197008485888
#
# For entry "1234567890123-1" with sequenceBits=10:
# offset = (1234567890123 << 10) + 1 = 1264197008485889
Implementation Notes
-
Pipelining: Batch multiple XADD commands for better throughput
-
Acks Handling:
-
acks=0: Return immediately without waiting for Redis response -
acks=1: Wait for Redis confirmation (default) -
acks=-1: Same asacks=1in single-node setup
-
-
Error Handling:
-
If topic doesn’t exist: Return
UNKNOWN_TOPIC_OR_PARTITION(error code 3) -
If partition doesn’t exist: Return
UNKNOWN_TOPIC_OR_PARTITION -
If Redis fails: Return
KAFKA_STORAGE_ERROR(error code 56)
-
-
Offset Encoding: Kafka offset = (timestamp << sequenceBits) | sequence
ProduceRequest {
topics: [
{
name: "orders"
partitions: [
{
index: 0
records: [
{ key: "order-123", value: "...", timestamp: 1234567890, headers: {...} }
{ key: "order-124", value: "...", timestamp: 1234567891, headers: {...} }
]
}
]
}
]
}
# For each message in partition 0 of topic "orders"
XADD korvet:orders:0 * \
key "order-123" \
value "..." \
timestamp "1234567890" \
headers "{...}"
XADD korvet:orders:0 * \
key "order-124" \
value "..." \
timestamp "1234567891" \
headers "{...}"
-
Redis returns entry IDs:
["1234567890123-0", "1234567890123-1"] -
Encode entry IDs as Kafka offsets:
offset = (timestamp << sequenceBits) | sequence -
Return base_offset in ProduceResponse
# No mapping storage needed - conversion is stateless
# Entry ID to Kafka offset:
# offset = (timestamp << sequenceBits) | sequence
# Kafka offset to Entry ID:
# timestamp = offset >> sequenceBits
# sequence = offset & ((1 << sequenceBits) - 1)
# entry_id = "{timestamp}-{sequence}"