Producer: Send Messages

This page describes how Korvet handles Kafka ProduceRequest operations.

Producer Sends Messages

Kafka Wire Protocol

ProduceRequest (API Key: 0)
ProduceRequest {
  transactional_id: null
  acks: 1                    // 0=no ack, 1=leader ack, -1=all replicas
  timeout_ms: 30000
  topics: [
    {
      name: "orders"
      partitions: [
        {
          index: 0
          records: [
            {
              offset: null   // assigned by broker
              timestamp: 1234567890000
              key: "order-123"
              value: {"product": "widget", "quantity": 5}
              headers: [
                {key: "source", value: "web"},
                {key: "version", value: "1.0"}
              ]
            },
            {
              offset: null
              timestamp: 1234567891000
              key: "order-124"
              value: {"product": "gadget", "quantity": 3}
              headers: []
            }
          ]
        }
      ]
    }
  ]
}
ProduceResponse (API Key: 0)
ProduceResponse {
  topics: [
    {
      name: "orders"
      partitions: [
        {
          index: 0
          error_code: 0        // NONE
          base_offset: 42      // first offset assigned
          log_append_time: 1234567890123
          log_start_offset: 0
        }
      ]
    }
  ]
  throttle_time_ms: 0
}

Sequence Diagram

ProduceRequest Sequence

Redis Commands Detail

Step-by-step Redis Operations
# 1. Add first message to stream
XADD korvet:orders:0 * \
  key "order-123" \
  value "{\"product\":\"widget\",\"quantity\":5}" \
  timestamp "1234567890000" \
  header.source "web" \
  header.version "1.0"
# Returns: "1234567890123-0"

# 2. Add second message to stream
XADD korvet:orders:0 * \
  key "order-124" \
  value "{\"product\":\"gadget\",\"quantity\":3}" \
  timestamp "1234567891000"
# Returns: "1234567890123-1"

# Kafka offsets are computed from entry IDs:
# offset = (timestamp << sequenceBits) | sequence
#
# For entry "1234567890123-0" with sequenceBits=10:
#   offset = (1234567890123 << 10) + 0 = 1264197008485888
#
# For entry "1234567890123-1" with sequenceBits=10:
#   offset = (1234567890123 << 10) + 1 = 1264197008485889

Implementation Notes

  • Pipelining: Batch multiple XADD commands for better throughput

  • Acks Handling:

    • acks=0: Return immediately without waiting for Redis response

    • acks=1: Wait for Redis confirmation (default)

    • acks=-1: Same as acks=1 in single-node setup

  • Error Handling:

    • If topic doesn’t exist: Return UNKNOWN_TOPIC_OR_PARTITION (error code 3)

    • If partition doesn’t exist: Return UNKNOWN_TOPIC_OR_PARTITION

    • If Redis fails: Return KAFKA_STORAGE_ERROR (error code 56)

  • Offset Encoding: Kafka offset = (timestamp << sequenceBits) | sequence

Kafka ProduceRequest
ProduceRequest {
  topics: [
    {
      name: "orders"
      partitions: [
        {
          index: 0
          records: [
            { key: "order-123", value: "...", timestamp: 1234567890, headers: {...} }
            { key: "order-124", value: "...", timestamp: 1234567891, headers: {...} }
          ]
        }
      ]
    }
  ]
}
Redis Commands
# For each message in partition 0 of topic "orders"
XADD korvet:orders:0 * \
  key "order-123" \
  value "..." \
  timestamp "1234567890" \
  headers "{...}"

XADD korvet:orders:0 * \
  key "order-124" \
  value "..." \
  timestamp "1234567891" \
  headers "{...}"
Response Mapping
  1. Redis returns entry IDs: ["1234567890123-0", "1234567890123-1"]

  2. Encode entry IDs as Kafka offsets: offset = (timestamp << sequenceBits) | sequence

  3. Return base_offset in ProduceResponse

Offset-to-Entry-ID Mapping
# No mapping storage needed - conversion is stateless
# Entry ID to Kafka offset:
#   offset = (timestamp << sequenceBits) | sequence
# Kafka offset to Entry ID:
#   timestamp = offset >> sequenceBits
#   sequence = offset & ((1 << sequenceBits) - 1)
#   entry_id = "{timestamp}-{sequence}"