Topic Management

This page describes how Korvet handles Kafka topic management operations.

Create Topic

Kafka Wire Protocol

CreateTopicsRequest (API Key: 19)
CreateTopicsRequest {
  topics: [
    {
      name: "orders"
      num_partitions: 3
      replication_factor: 1      // ignored in single-node setup
      assignments: []            // empty = auto-assign
      configs: [
        {name: "retention.ms", value: "86400000"},        // 1 day
        {name: "retention.bytes", value: "1073741824"},   // 1 GB
        {name: "compression.type", value: "lz4"},
        {name: "storage.compression", value: "zstd"}
      ]
    }
  ]
  timeout_ms: 60000
  validate_only: false         // true = validate but don't create
}
CreateTopicsResponse (API Key: 19)
CreateTopicsResponse {
  throttle_time_ms: 0
  topics: [
    {
      name: "orders"
      error_code: 0              // NONE
      error_message: null
      num_partitions: 3
      replication_factor: 1
      configs: [
        {name: "retention.ms", value: "86400000", read_only: false, is_default: false},
        {name: "retention.bytes", value: "1073741824", read_only: false, is_default: false},
        {name: "compression.type", value: "lz4", read_only: false, is_default: false},
        {name: "storage.compression", value: "zstd", read_only: false, is_default: false}
      ]
    }
  ]
}

Sequence Diagram

CreateTopicsRequest Sequence

Redis Commands Detail

Step-by-step Redis Operations
# Topic creation is primarily a broker-side operation
# Topic metadata (num_partitions, retention, compression, etc.) is stored
# in Redis hashes under the keyspace prefix

# Redis Streams for partitions are created lazily on first XADD:
# - korvet:orders:0 (created when first message produced to partition 0)
# - korvet:orders:1 (created when first message produced to partition 1)
# - korvet:orders:2 (created when first message produced to partition 2)

# No initialization commands needed - streams are auto-created by Redis

Implementation Notes

  • Topic Name Validation:

    • Length: 1-249 characters

    • Allowed chars: a-z, A-Z, 0-9, ., _, -

    • Cannot be . or ..

    • Cannot start with __ (reserved for internal topics)

    • Return INVALID_TOPIC_EXCEPTION (error code 17) if invalid

  • Topic Already Exists:

    • Check broker’s in-memory topic registry

    • Return TOPIC_ALREADY_EXISTS (error code 36) if found

  • Partition Count:

    • Must be > 0

    • Default: 1 if not specified

    • Immutable after creation

  • Replication Factor:

    • Ignored in single-node setup (always 1)

    • In clustered setup: would determine Redis replication

  • Configuration Defaults:

    • retention.ms: 604800000 (7 days)

    • retention.bytes: -1 (unlimited)

    • compression.type: none

    • storage.compression: lz4

  • Metadata Storage: Topic metadata stored in broker memory or external config, not in Redis

  • Lazy Stream Creation:

    • Redis Streams created automatically on first XADD

    • Saves memory for unused topics

  • Validate Only: If validate_only=true, perform all validation but don’t create topic

Delete Topic

Kafka Wire Protocol

DeleteTopicsRequest (API Key: 20)
DeleteTopicsRequest {
  topic_names: ["orders", "payments"]
  timeout_ms: 30000
}
DeleteTopicsResponse (API Key: 20)
DeleteTopicsResponse {
  throttle_time_ms: 0
  topics: [
    {
      name: "orders"
      error_code: 0            // NONE
      error_message: null
    },
    {
      name: "payments"
      error_code: 3            // UNKNOWN_TOPIC_OR_PARTITION
      error_message: "Topic 'payments' does not exist"
    }
  ]
}

Sequence Diagram

DeleteTopicsRequest Sequence

Redis Commands Detail

Step-by-step Redis Operations
# 1. Verify topic exists in broker metadata (not Redis)
# If not exists: return UNKNOWN_TOPIC_OR_PARTITION (error code 3)

# 2. Get partition count from broker metadata
# For topic "orders" with 3 partitions

# 3. Destroy consumer groups for each partition
# For partition 0:
XGROUP DESTROY korvet:orders:0 my-group
XGROUP DESTROY korvet:orders:0 other-group
# For partition 1:
XGROUP DESTROY korvet:orders:1 my-group
XGROUP DESTROY korvet:orders:1 other-group
# For partition 2:
XGROUP DESTROY korvet:orders:2 my-group
XGROUP DESTROY korvet:orders:2 other-group
# Note: XGROUP DESTROY returns error if group doesn't exist - ignore errors

# 4. Delete partition streams
DEL korvet:orders:0
DEL korvet:orders:1
DEL korvet:orders:2

# 5. Remove topic from broker metadata (Redis hash)

# Alternative: Use SCAN to find all partition streams (safer)
SCAN 0 MATCH korvet:orders:* COUNT 100 TYPE stream
# Returns: cursor and list of stream keys
# Then DEL all found keys

Implementation Notes

  • Topic Doesn’t Exist: Return UNKNOWN_TOPIC_OR_PARTITION (error code 3)

  • Active Consumers:

    • Kafka typically prevents deletion if consumers are active

    • Option 1: Check for active consumers and return error

    • Option 2: Force delete and let consumers handle errors

  • Consumer Group Cleanup:

    • Must destroy Redis consumer groups for all partitions

    • Use XGROUP DESTROY for each group/partition combination

    • Ignore errors if group doesn’t exist for a partition

  • Atomicity:

    • Use Lua script to ensure atomic deletion

    • If any step fails, entire operation should fail

  • Partial Deletion: If deletion fails midway, topic may be in inconsistent state

    • Implement cleanup/recovery mechanism

    • Or use transaction with rollback capability

  • Large Topics: For topics with many partitions or large streams:

    • Consider async deletion (mark for deletion, clean up in background)

    • Use UNLINK instead of DEL for non-blocking deletion

  • Cascading Deletes: Also delete group offset commits for this topic: redis # For each group, remove committed offsets for this topic’s partitions HDEL korvet:group:my-group:offsets korvet:orders:0 korvet:orders:1 korvet:orders:2

Metadata Request (List Topics)

Kafka Wire Protocol

MetadataRequest (API Key: 3)
MetadataRequest {
  topics: null                 // null = all topics, or specific topic list
  allow_auto_topic_creation: false
  include_cluster_authorized_operations: false
  include_topic_authorized_operations: false
}
MetadataResponse (API Key: 3)
MetadataResponse {
  throttle_time_ms: 0
  brokers: [
    {
      node_id: 0
      host: "localhost"
      port: 9092
      rack: null
    }
  ]
  cluster_id: "korvet-cluster-1"
  controller_id: 0
  topics: [
    {
      error_code: 0
      name: "orders"
      is_internal: false
      partitions: [
        {
          error_code: 0
          partition: 0
          leader: 0              // broker node_id
          replica_nodes: [0]
          isr_nodes: [0]         // in-sync replicas
          offline_replicas: []
        },
        {
          error_code: 0
          partition: 1
          leader: 0
          replica_nodes: [0]
          isr_nodes: [0]
          offline_replicas: []
        },
        {
          error_code: 0
          partition: 2
          leader: 0
          replica_nodes: [0]
          isr_nodes: [0]
          offline_replicas: []
        }
      ]
    },
    {
      error_code: 0
      name: "payments"
      is_internal: false
      partitions: [
        {
          error_code: 0
          partition: 0
          leader: 0
          replica_nodes: [0]
          isr_nodes: [0]
          offline_replicas: []
        }
      ]
    }
  ]
}

Sequence Diagram

MetadataRequest Sequence

Redis Commands Detail

Step-by-step Redis Operations
# Metadata requests are handled by broker's in-memory topic registry
# No Redis commands needed

# Broker returns topic metadata from memory/config:
# - Topic names
# - Partition counts
# - Retention policies
# - Compression settings
# - etc.

# Optional: Use SCAN to discover existing partition streams
SCAN 0 MATCH korvet:*:* COUNT 100 TYPE stream
# Returns: cursor and list of stream keys like ["korvet:orders:0", "korvet:orders:1", "korvet:payments:0"]
# Can be used to verify which topics/partitions actually have data

Implementation Notes

  • All Topics vs Specific Topics:

    • If topics=null: Return metadata for all topics from broker registry

    • If topics=[…​]: Return metadata only for specified topics

    • Non-existent topics: Include in response with UNKNOWN_TOPIC_OR_PARTITION error

  • Broker Information:

    • Single-node setup: Always return one broker (node_id=0, this server)

    • Clustered setup: Return all Korvet broker nodes

  • Partition Leadership:

    • Single-node: All partitions have leader=0, replicas=[0], isr=[0]

    • Clustered: Distribute partition leadership across brokers

  • Controller:

    • Single-node: controller_id=0 (this broker)

    • Clustered: Elect controller using Redis or external coordination

  • Auto Topic Creation:

    • If allow_auto_topic_creation=true and topic doesn’t exist:

    • Create topic with default settings

    • Return metadata for newly created topic

    • If false: Return error for non-existent topics

  • Performance: Use pipelining to fetch metadata for multiple topics in parallel

  • Caching: Cache topic metadata in broker memory, invalidate on topic changes