Topic Management
This page describes how Korvet handles Kafka topic management operations.
Create Topic
Kafka Wire Protocol
CreateTopicsRequest {
topics: [
{
name: "orders"
num_partitions: 3
replication_factor: 1 // ignored in single-node setup
assignments: [] // empty = auto-assign
configs: [
{name: "retention.ms", value: "86400000"}, // 1 day
{name: "retention.bytes", value: "1073741824"}, // 1 GB
{name: "compression.type", value: "lz4"},
{name: "storage.compression", value: "zstd"}
]
}
]
timeout_ms: 60000
validate_only: false // true = validate but don't create
}
CreateTopicsResponse {
throttle_time_ms: 0
topics: [
{
name: "orders"
error_code: 0 // NONE
error_message: null
num_partitions: 3
replication_factor: 1
configs: [
{name: "retention.ms", value: "86400000", read_only: false, is_default: false},
{name: "retention.bytes", value: "1073741824", read_only: false, is_default: false},
{name: "compression.type", value: "lz4", read_only: false, is_default: false},
{name: "storage.compression", value: "zstd", read_only: false, is_default: false}
]
}
]
}
Redis Commands Detail
# Topic creation is primarily a broker-side operation
# Topic metadata (num_partitions, retention, compression, etc.) is stored
# in Redis hashes under the keyspace prefix
# Redis Streams for partitions are created lazily on first XADD:
# - korvet:orders:0 (created when first message produced to partition 0)
# - korvet:orders:1 (created when first message produced to partition 1)
# - korvet:orders:2 (created when first message produced to partition 2)
# No initialization commands needed - streams are auto-created by Redis
Implementation Notes
-
Topic Name Validation:
-
Length: 1-249 characters
-
Allowed chars:
a-z,A-Z,0-9,.,_,- -
Cannot be
.or.. -
Cannot start with
__(reserved for internal topics) -
Return
INVALID_TOPIC_EXCEPTION(error code 17) if invalid
-
-
Topic Already Exists:
-
Check broker’s in-memory topic registry
-
Return
TOPIC_ALREADY_EXISTS(error code 36) if found
-
-
Partition Count:
-
Must be > 0
-
Default: 1 if not specified
-
Immutable after creation
-
-
Replication Factor:
-
Ignored in single-node setup (always 1)
-
In clustered setup: would determine Redis replication
-
-
Configuration Defaults:
-
retention.ms: 604800000 (7 days) -
retention.bytes: -1 (unlimited) -
compression.type: none -
storage.compression: lz4
-
-
Metadata Storage: Topic metadata stored in broker memory or external config, not in Redis
-
Lazy Stream Creation:
-
Redis Streams created automatically on first XADD
-
Saves memory for unused topics
-
-
Validate Only: If
validate_only=true, perform all validation but don’t create topic
Delete Topic
Kafka Wire Protocol
DeleteTopicsRequest {
topic_names: ["orders", "payments"]
timeout_ms: 30000
}
DeleteTopicsResponse {
throttle_time_ms: 0
topics: [
{
name: "orders"
error_code: 0 // NONE
error_message: null
},
{
name: "payments"
error_code: 3 // UNKNOWN_TOPIC_OR_PARTITION
error_message: "Topic 'payments' does not exist"
}
]
}
Redis Commands Detail
# 1. Verify topic exists in broker metadata (not Redis)
# If not exists: return UNKNOWN_TOPIC_OR_PARTITION (error code 3)
# 2. Get partition count from broker metadata
# For topic "orders" with 3 partitions
# 3. Destroy consumer groups for each partition
# For partition 0:
XGROUP DESTROY korvet:orders:0 my-group
XGROUP DESTROY korvet:orders:0 other-group
# For partition 1:
XGROUP DESTROY korvet:orders:1 my-group
XGROUP DESTROY korvet:orders:1 other-group
# For partition 2:
XGROUP DESTROY korvet:orders:2 my-group
XGROUP DESTROY korvet:orders:2 other-group
# Note: XGROUP DESTROY returns error if group doesn't exist - ignore errors
# 4. Delete partition streams
DEL korvet:orders:0
DEL korvet:orders:1
DEL korvet:orders:2
# 5. Remove topic from broker metadata (Redis hash)
# Alternative: Use SCAN to find all partition streams (safer)
SCAN 0 MATCH korvet:orders:* COUNT 100 TYPE stream
# Returns: cursor and list of stream keys
# Then DEL all found keys
Implementation Notes
-
Topic Doesn’t Exist: Return
UNKNOWN_TOPIC_OR_PARTITION(error code 3) -
Active Consumers:
-
Kafka typically prevents deletion if consumers are active
-
Option 1: Check for active consumers and return error
-
Option 2: Force delete and let consumers handle errors
-
-
Consumer Group Cleanup:
-
Must destroy Redis consumer groups for all partitions
-
Use
XGROUP DESTROYfor each group/partition combination -
Ignore errors if group doesn’t exist for a partition
-
-
Atomicity:
-
Use Lua script to ensure atomic deletion
-
If any step fails, entire operation should fail
-
-
Partial Deletion: If deletion fails midway, topic may be in inconsistent state
-
Implement cleanup/recovery mechanism
-
Or use transaction with rollback capability
-
-
Large Topics: For topics with many partitions or large streams:
-
Consider async deletion (mark for deletion, clean up in background)
-
Use UNLINK instead of DEL for non-blocking deletion
-
-
Cascading Deletes: Also delete group offset commits for this topic:
redis # For each group, remove committed offsets for this topic’s partitions HDEL korvet:group:my-group:offsets korvet:orders:0 korvet:orders:1 korvet:orders:2
Metadata Request (List Topics)
Kafka Wire Protocol
MetadataRequest {
topics: null // null = all topics, or specific topic list
allow_auto_topic_creation: false
include_cluster_authorized_operations: false
include_topic_authorized_operations: false
}
MetadataResponse {
throttle_time_ms: 0
brokers: [
{
node_id: 0
host: "localhost"
port: 9092
rack: null
}
]
cluster_id: "korvet-cluster-1"
controller_id: 0
topics: [
{
error_code: 0
name: "orders"
is_internal: false
partitions: [
{
error_code: 0
partition: 0
leader: 0 // broker node_id
replica_nodes: [0]
isr_nodes: [0] // in-sync replicas
offline_replicas: []
},
{
error_code: 0
partition: 1
leader: 0
replica_nodes: [0]
isr_nodes: [0]
offline_replicas: []
},
{
error_code: 0
partition: 2
leader: 0
replica_nodes: [0]
isr_nodes: [0]
offline_replicas: []
}
]
},
{
error_code: 0
name: "payments"
is_internal: false
partitions: [
{
error_code: 0
partition: 0
leader: 0
replica_nodes: [0]
isr_nodes: [0]
offline_replicas: []
}
]
}
]
}
Redis Commands Detail
# Metadata requests are handled by broker's in-memory topic registry
# No Redis commands needed
# Broker returns topic metadata from memory/config:
# - Topic names
# - Partition counts
# - Retention policies
# - Compression settings
# - etc.
# Optional: Use SCAN to discover existing partition streams
SCAN 0 MATCH korvet:*:* COUNT 100 TYPE stream
# Returns: cursor and list of stream keys like ["korvet:orders:0", "korvet:orders:1", "korvet:payments:0"]
# Can be used to verify which topics/partitions actually have data
Implementation Notes
-
All Topics vs Specific Topics:
-
If
topics=null: Return metadata for all topics from broker registry -
If
topics=[…]: Return metadata only for specified topics -
Non-existent topics: Include in response with
UNKNOWN_TOPIC_OR_PARTITIONerror
-
-
Broker Information:
-
Single-node setup: Always return one broker (node_id=0, this server)
-
Clustered setup: Return all Korvet broker nodes
-
-
Partition Leadership:
-
Single-node: All partitions have leader=0, replicas=[0], isr=[0]
-
Clustered: Distribute partition leadership across brokers
-
-
Controller:
-
Single-node: controller_id=0 (this broker)
-
Clustered: Elect controller using Redis or external coordination
-
-
Auto Topic Creation:
-
If
allow_auto_topic_creation=trueand topic doesn’t exist: -
Create topic with default settings
-
Return metadata for newly created topic
-
If false: Return error for non-existent topics
-
-
Performance: Use pipelining to fetch metadata for multiple topics in parallel
-
Caching: Cache topic metadata in broker memory, invalidate on topic changes