Topic Management
This guide covers creating and managing topics in Korvet.
Server defaults for new topics are configured via korvet.topics.* in Spring configuration.
|
Creating Topics
Automatic Topic Creation
By default, topics are automatically created when you first produce to them or request metadata for them.
This behavior can be controlled with the korvet.topics.auto-create configuration:
korvet:
topics:
auto-create: true # Enable/disable automatic topic creation (default: true)
partitions: 1 # Default partitions for auto-created topics (default: 1)
When auto-creation is disabled, you must explicitly create topics before using them.
Explicit Topic Creation
You can create topics explicitly using the Kafka command-line tools:
kafka-topics --bootstrap-server localhost:9092 \
--create \
--topic my-topic \
--partitions 3 \
--replication-factor 1
The replication-factor parameter is accepted for compatibility but ignored, as Korvet uses Redis for storage and replication.
|
Creating Topics with Korvet-Specific Configuration
|
The Solution: Use the two-step approach below, or use the Kafka AdminClient API. |
To create topics with Korvet-specific configurations, use a two-step approach:
# Step 1: Create topic with standard Kafka configs
kafka-topics --bootstrap-server localhost:9092 \
--create \
--topic my-topic \
--partitions 3 \
--config retention.ms=604800000
# Step 2: Add Korvet-specific configs using kafka-configs
kafka-configs --bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name my-topic \
--alter \
--add-config value.type=raw,storage.compression=zstd
Alternatively, use the Kafka AdminClient API which does not perform client-side validation:
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient admin = AdminClient.create(props)) {
NewTopic topic = new NewTopic("my-topic", 3, (short) 1);
topic.configs(Map.of(
"value.type", "raw",
"storage.compression", "zstd",
"retention.ms", "604800000"
));
admin.createTopics(List.of(topic)).all().get();
}
Available configurations:
-
value.type- How message values are stored (auto,json, orraw). Default:auto -
storage.compression- Compression for values at rest in Redis (none,gzip,snappy,lz4,zstd). Default:lz4 -
compression.type- Compression for Kafka fetch responses (none,gzip,snappy,lz4,zstd) -
retention.ms- Total time-based retention in milliseconds (across all tiers) -
retention.bytes- Total size-based retention in bytes
Tiered storage configurations (when remote storage is enabled at server level):
-
remote.storage.enable- Enable tiered storage for this topic (Kafka KIP-405). Default:false -
local.retention.ms- Time to keep in local tier (Redis RAM) before archiving.-2= useretention.ms(Kafka KIP-405) -
local.retention.bytes- Size to keep in local tier before archiving.-2= useretention.bytes(Kafka KIP-405) -
bucket.duration.ms- Time bucket duration in milliseconds for stream bucketing. Default:3600000(1 hour). Shorter durations enable faster archival but create more Redis keys (Korvet extension)
For large messages or deeply nested JSON, use --config value.type=raw --config storage.compression=zstd to reduce Redis memory usage by up to 51x.
|
Describing Topics
Get details about a topic:
kafka-topics --bootstrap-server localhost:9092 \
--describe \
--topic my-topic
Deleting Topics
Delete a topic:
kafka-topics --bootstrap-server localhost:9092 \
--delete \
--topic my-topic
Altering Topic Configuration
Topics can be configured with:
-
Partitions: Number of partitions for parallelism (set during creation only)
-
Retention: Time-based (
retention.ms) and size-based (retention.bytes) retention policies -
Value Type: How message values are stored (
value.type:auto,json, orraw) -
Storage Compression: Compression for values at rest in Redis (
storage.compression) -
Protocol Compression: Compression for Kafka fetch responses (
compression.type)
Using kafka-configs CLI
Use kafka-configs to alter topic configurations:
kafka-configs --bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name my-topic \
--alter \
--add-config value.type=raw,storage.compression=zstd,compression.type=lz4
Using AdminClient API
Alternatively, use the AdminClient API:
ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "my-topic");
List<AlterConfigOp> ops = List.of(
new AlterConfigOp(new ConfigEntry("value.type", "raw"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("storage.compression", "zstd"), AlterConfigOp.OpType.SET)
);
admin.incrementalAlterConfigs(Map.of(topicResource, ops)).all().get();
Value types:
-
auto- Automatically detect optimal storage: JSON flattening for JSON objects, RAW for everything else (default) -
json- Flatten top-level JSON fields into separate Redis Stream fields -
raw- Store entire value as single field with optional compression
Storage compression types:
-
none- No compression (default) -
gzip- Good compression ratio (26x), slower -
snappy- Fast, moderate compression (10x) -
lz4- Very fast, good compression (14x) -
zstd- Best compression ratio (51x), fast - Recommended
Use value.type=raw with storage.compression=zstd for deeply nested JSON or large messages (>10KB) to reduce Redis memory usage by up to 51x.
|
See Value Mapper Selection for detailed guidance on choosing between JSON flattening and RAW storage.
Describing Topic Configuration
View current topic configuration using kafka-configs (describe works):
kafka-configs --bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name my-topic \
--describe
Protocol compression types:
-
none- No compression (default) -
gzip- Good compression ratio, higher CPU usage -
snappy- Balanced compression and speed -
lz4- Fast compression, lower CPU usage -
zstd- Best compression ratio, moderate CPU usage
Storage compression and protocol compression are independent. You can use both together (e.g., storage.compression=zstd for Redis memory savings and compression.type=lz4 for fast network compression).
|
See Compression for more details on protocol compression.
Tiered Storage Configuration
When tiered storage is enabled at the server level, you can configure per-topic retention policies to control when data moves between tiers.
Configuring Tiered Storage with AdminClient API
Use the AdminClient API to configure tiered storage (since kafka-configs --alter is not supported):
NewTopic topic = new NewTopic("my-topic", 3, (short) 1);
topic.configs(Map.of(
"remote.storage.enable", "true",
"retention.ms", "31536000000", // 1 year total
"local.retention.ms", "86400000" // 1 day in local tier
));
admin.createTopics(List.of(topic)).all().get();
This configures:
-
Local tier: 1 day (
local.retention.ms=86400000) -
Remote tier: ~364 days (implicit:
retention.ms - local.retention.ms) -
Total retention: 1 year (
retention.ms=31536000000)
Tiered Storage Configuration Reference
| Configuration | Default | Description |
|---|---|---|
|
|
Enable tiered storage for this topic (Kafka KIP-405) |
|
|
Time to keep in local tier before archiving. |
|
|
Size to keep in local tier. |
|
|
Total retention across all tiers (7 days default) |
|
|
Time bucket duration in milliseconds. Streams are partitioned into time buckets; only sealed (completed) buckets are archived. Shorter durations enable faster archival but create more Redis keys. (Korvet extension) |
Remote tier retention is implicit and calculated as retention.ms - local.retention.ms. Data is deleted after the total retention.ms period.
|
See Remote Storage for server-level tiered storage configuration.