|
This version is still in development and is not considered stable yet. For the latest stable version, please use Korvet 0.12.5! |
Producer: Send Messages
This page describes how Korvet handles Kafka ProduceRequest operations.
| This page is implementation-oriented and focuses on Kafka-to-Redis mapping details. |
Producer Sends Messages
Kafka Wire Protocol
ProduceRequest {
transactional_id: null
acks: 1 // 0=no ack, 1=leader ack, -1=all replicas
timeout_ms: 30000
topics: [
{
name: "orders"
partitions: [
{
index: 0
records: [
{
offset: null // assigned by broker
timestamp: 1234567890000
key: "order-123"
value: {"product": "widget", "quantity": 5}
headers: [
{key: "source", value: "web"},
{key: "version", value: "1.0"}
]
},
{
offset: null
timestamp: 1234567891000
key: "order-124"
value: {"product": "gadget", "quantity": 3}
headers: []
}
]
}
]
}
]
}
ProduceResponse {
topics: [
{
name: "orders"
partitions: [
{
index: 0
error_code: 0 // NONE
base_offset: 42 // first offset assigned
log_append_time: 1234567890123
log_start_offset: 0
}
]
}
]
throttle_time_ms: 0
}
Redis Commands Detail
Produce handling translates records into one or more XADD operations on the target stream key.
The returned Redis entry IDs are converted to Kafka offsets using the topic’s configured offset mapper.
Implementation Notes
-
Pipelining: Batch multiple XADD commands for better throughput
-
Chunked Writes: Large batches may be split into smaller chunks before writing.
-
Idempotent Retry Handling: Duplicate producer batches can be detected and skipped before storage writes.
-
Offset Encoding: Kafka offsets are derived from Redis entry IDs using the topic-specific
sequenceBitsconfiguration. -
Retention: Retention trimming is applied in the storage layer during write processing.
ProduceRequest {
topics: [
{
name: "orders"
partitions: [
{
index: 0
records: [
{ key: "order-123", value: "...", timestamp: 1234567890, headers: {...} }
{ key: "order-124", value: "...", timestamp: 1234567891, headers: {...} }
]
}
]
}
]
}
XADD korvet:orders:0 * field value ...
XADD korvet:orders:0 * field value ...
-
Redis returns entry IDs:
["1234567890123-0", "1234567890123-1"] -
Encode entry IDs as Kafka offsets:
offset = (timestamp << sequenceBits) | sequence -
Return base_offset in ProduceResponse
# No mapping storage needed - conversion is stateless
# Entry ID to Kafka offset:
# offset = (timestamp << sequenceBits) | sequence
# Kafka offset to Entry ID:
# timestamp = offset >> sequenceBits
# sequence = offset & ((1 << sequenceBits) - 1)
# entry_id = "{timestamp}-{sequence}"