This version is still in development and is not considered stable yet. For the latest stable version, please use Korvet 0.12.5!

Producer: Send Messages

This page describes how Korvet handles Kafka ProduceRequest operations.

This page is implementation-oriented and focuses on Kafka-to-Redis mapping details.

Producer Sends Messages

Kafka Wire Protocol

ProduceRequest (API Key: 0)
ProduceRequest {
  transactional_id: null
  acks: 1                    // 0=no ack, 1=leader ack, -1=all replicas
  timeout_ms: 30000
  topics: [
    {
      name: "orders"
      partitions: [
        {
          index: 0
          records: [
            {
              offset: null   // assigned by broker
              timestamp: 1234567890000
              key: "order-123"
              value: {"product": "widget", "quantity": 5}
              headers: [
                {key: "source", value: "web"},
                {key: "version", value: "1.0"}
              ]
            },
            {
              offset: null
              timestamp: 1234567891000
              key: "order-124"
              value: {"product": "gadget", "quantity": 3}
              headers: []
            }
          ]
        }
      ]
    }
  ]
}
ProduceResponse (API Key: 0)
ProduceResponse {
  topics: [
    {
      name: "orders"
      partitions: [
        {
          index: 0
          error_code: 0        // NONE
          base_offset: 42      // first offset assigned
          log_append_time: 1234567890123
          log_start_offset: 0
        }
      ]
    }
  ]
  throttle_time_ms: 0
}

Sequence Diagram

ProduceRequest Sequence

Redis Commands Detail

Produce handling translates records into one or more XADD operations on the target stream key. The returned Redis entry IDs are converted to Kafka offsets using the topic’s configured offset mapper.

Implementation Notes

  • Pipelining: Batch multiple XADD commands for better throughput

  • Chunked Writes: Large batches may be split into smaller chunks before writing.

  • Idempotent Retry Handling: Duplicate producer batches can be detected and skipped before storage writes.

  • Offset Encoding: Kafka offsets are derived from Redis entry IDs using the topic-specific sequenceBits configuration.

  • Retention: Retention trimming is applied in the storage layer during write processing.

Kafka ProduceRequest
ProduceRequest {
  topics: [
    {
      name: "orders"
      partitions: [
        {
          index: 0
          records: [
            { key: "order-123", value: "...", timestamp: 1234567890, headers: {...} }
            { key: "order-124", value: "...", timestamp: 1234567891, headers: {...} }
          ]
        }
      ]
    }
  ]
}
Redis Commands
XADD korvet:orders:0 * field value ...
XADD korvet:orders:0 * field value ...
Response Mapping
  1. Redis returns entry IDs: ["1234567890123-0", "1234567890123-1"]

  2. Encode entry IDs as Kafka offsets: offset = (timestamp << sequenceBits) | sequence

  3. Return base_offset in ProduceResponse

Offset-to-Entry-ID Mapping
# No mapping storage needed - conversion is stateless
# Entry ID to Kafka offset:
#   offset = (timestamp << sequenceBits) | sequence
# Kafka offset to Entry ID:
#   timestamp = offset >> sequenceBits
#   sequence = offset & ((1 << sequenceBits) - 1)
#   entry_id = "{timestamp}-{sequence}"