This version is still in development and is not considered stable yet. For the latest stable version, please use Korvet 0.12.5!

Topic Management

This page describes how Korvet handles Kafka topic management operations.

This page is implementation-oriented and focuses on Kafka-to-Redis mapping details.

Create Topic

Kafka Wire Protocol

CreateTopicsRequest (API Key: 19)
CreateTopicsRequest {
  topics: [
    {
      name: "orders"
      num_partitions: 3
      replication_factor: 1      // ignored in single-node setup
      assignments: []            // empty = auto-assign
      configs: [
        {name: "retention.ms", value: "86400000"},        // 1 day
        {name: "retention.bytes", value: "1073741824"},   // 1 GB
        {name: "compression.type", value: "lz4"},
        {name: "storage.compression", value: "zstd"}
      ]
    }
  ]
  timeout_ms: 60000
  validate_only: false         // true = validate but don't create
}
CreateTopicsResponse (API Key: 19)
CreateTopicsResponse {
  throttle_time_ms: 0
  topics: [
    {
      name: "orders"
      error_code: 0              // NONE
      error_message: null
      num_partitions: 3
      replication_factor: 1
      configs: [
        {name: "retention.ms", value: "86400000", read_only: false, is_default: false},
        {name: "retention.bytes", value: "1073741824", read_only: false, is_default: false},
        {name: "compression.type", value: "lz4", read_only: false, is_default: false},
        {name: "storage.compression", value: "zstd", read_only: false, is_default: false}
      ]
    }
  ]
}

Sequence Diagram

CreateTopicsRequest Sequence

Redis Commands Detail

Step-by-step Redis Operations
# Topic creation is primarily a broker-side operation
# Topic metadata (num_partitions, retention, compression, etc.) is stored
# in Redis hashes under the keyspace prefix

# Redis Streams for partitions are created lazily on first XADD:
# - korvet:orders:0 (created when first message produced to partition 0)
# - korvet:orders:1 (created when first message produced to partition 1)
# - korvet:orders:2 (created when first message produced to partition 2)

# No initialization commands needed - streams are auto-created by Redis

Implementation Notes

  • Topic Name Validation:

    • Length: 1-249 characters

    • Allowed chars: a-z, A-Z, 0-9, ., _, -

    • Cannot be . or ..

    • Return INVALID_TOPIC_EXCEPTION (error code 17) if invalid

  • Topic Already Exists:

    • Check the topic registry, which persists topic metadata and maintains local caches

    • Return TOPIC_ALREADY_EXISTS (error code 36) if found

  • Partition Count:

    • Must be > 0

  • Replication Factor / Assignments:

    • Replica assignments are rejected

    • Kafka replication factor is not mapped onto Redis placement by this handler

  • Configuration Defaults:

    • Defaults come from the runtime topic-registry configuration

    • Request-level configs override those defaults before persistence

  • Metadata Storage: Topic metadata is persisted through the topic registry in Redis-backed metadata structures

  • Lazy Stream Creation:

    • Redis Streams created automatically on first XADD

    • Saves memory for unused topics

  • Validate Only: If validate_only=true, perform all validation but don’t create topic

Delete Topic

Kafka Wire Protocol

DeleteTopicsRequest (API Key: 20)
DeleteTopicsRequest {
  topic_names: ["orders", "payments"]
  timeout_ms: 30000
}
DeleteTopicsResponse (API Key: 20)
DeleteTopicsResponse {
  throttle_time_ms: 0
  topics: [
    {
      name: "orders"
      error_code: 0            // NONE
      error_message: null
    },
    {
      name: "payments"
      error_code: 3            // UNKNOWN_TOPIC_OR_PARTITION
      error_message: "Topic 'payments' does not exist"
    }
  ]
}

Sequence Diagram

DeleteTopicsRequest Sequence

Redis Commands Detail

DeleteTopics removes the topic through the topic registry and returns a per-topic result. This page does not guarantee synchronous deletion of every stream key or consumer-group artifact unless the underlying registry/storage implementation does so.

Implementation Notes

  • Topic Doesn’t Exist: Return UNKNOWN_TOPIC_OR_PARTITION (error code 3)

  • Deletion Scope: The request path removes the topic from the topic registry. Stronger cleanup guarantees should be documented at the storage-layer level, not assumed here.

Metadata Request (List Topics)

Kafka Wire Protocol

MetadataRequest (API Key: 3)
MetadataRequest {
  topics: null                 // null = all topics, or specific topic list
  allow_auto_topic_creation: false
  include_cluster_authorized_operations: false
  include_topic_authorized_operations: false
}
MetadataResponse (API Key: 3)
MetadataResponse {
  throttle_time_ms: 0
  brokers: [
    {
      node_id: 0
      host: "localhost"
      port: 9092
      rack: null
    }
  ]
  cluster_id: "korvet-cluster-1"
  controller_id: 0
  topics: [
    {
      error_code: 0
      name: "orders"
      is_internal: false
      partitions: [
        {
          error_code: 0
          partition: 0
          leader: 0              // broker node_id
          replica_nodes: [0]
          isr_nodes: [0]         // in-sync replicas
          offline_replicas: []
        },
        {
          error_code: 0
          partition: 1
          leader: 0
          replica_nodes: [0]
          isr_nodes: [0]
          offline_replicas: []
        },
        {
          error_code: 0
          partition: 2
          leader: 0
          replica_nodes: [0]
          isr_nodes: [0]
          offline_replicas: []
        }
      ]
    },
    {
      error_code: 0
      name: "payments"
      is_internal: false
      partitions: [
        {
          error_code: 0
          partition: 0
          leader: 0
          replica_nodes: [0]
          isr_nodes: [0]
          offline_replicas: []
        }
      ]
    }
  ]
}

Sequence Diagram

MetadataRequest Sequence

Redis Commands Detail

Step-by-step Redis Operations
# Metadata requests are served from broker state backed by the topic registry.
# The request path does not scan Redis stream keys to discover topics.

Implementation Notes

  • All Topics vs Specific Topics:

    • If topics=null: Return metadata for all topics from broker registry

    • If topics=[…​]: Return metadata only for specified topics

    • Non-existent topics: Include in response with UNKNOWN_TOPIC_OR_PARTITION error

  • Broker Information:

    • Single-node setup: Always return one broker (node_id=0, this server)

    • Clustered setup: Return all Korvet broker nodes

  • Partition Leadership:

    • Single-broker mode: one broker leads every partition

    • Multi-broker mode: metadata includes all known brokers and currently uses the first broker as leader for each partition

  • Controller: controller_id is the local broker ID in the current handler implementation

  • Auto Topic Creation: Metadata resolution flows through the topic registry, so runtime auto-create configuration affects missing-topic behavior in addition to the request flag