|
This version is still in development and is not considered stable yet. For the latest stable version, please use Korvet 0.12.5! |
Topic Management
This guide covers creating and managing topics in Korvet.
Automatic topic creation is configured via korvet.topics.auto-create. Defaults applied to newly created topics are configured via korvet.topics.*.
|
Creating Topics
Automatic Topic Creation
By default, topics are not automatically created when you first produce to them or request metadata for them.
This behavior can be controlled with the korvet.topics.auto-create configuration:
korvet:
topics:
auto-create: false # Enable/disable automatic topic creation (default: false)
partitions: 1 # Default partitions for auto-created topics (default: 1)
When auto-creation is disabled, you must explicitly create topics before using them.
Explicit Topic Creation
You can create topics explicitly with standard Kafka tooling or with the bundled korvet-cli.
Using korvet-cli topics
korvet-cli topics mirrors kafka-topics syntax, but accepts Korvet-specific configs such as
storage.compression, value.type, offset.sequence.bits, and bucket.duration.ms.
korvet-cli topics --bootstrap-server localhost:9092 \
--create \
--topic my-topic \
--partitions 3 \
--config retention.ms=604800000 \
--config value.type=raw \
--config storage.compression=zstd \
--config bucket.duration.ms=3600000
--replication-factor is accepted for compatibility but ignored, as Korvet uses Redis for storage and replication.
|
Using kafka-topics
Upstream Kafka CLI tooling works for standard Kafka topic configs:
kafka-topics --bootstrap-server localhost:9092 \
--create \
--topic my-topic \
--partitions 3 \
--replication-factor 1
Creating Topics with Korvet-Specific Configuration
|
The upstream Use |
To create topics with Korvet-specific configurations, use a two-step approach:
# Step 1: Create topic with standard Kafka configs
kafka-topics --bootstrap-server localhost:9092 \
--create \
--topic my-topic \
--partitions 3 \
--config retention.ms=604800000
# Step 2: Add Korvet-specific configs using kafka-configs
kafka-configs --bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name my-topic \
--alter \
--add-config value.type=raw,storage.compression=zstd
Alternatively, use the Kafka AdminClient API which does not perform client-side validation:
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient admin = AdminClient.create(props)) {
NewTopic topic = new NewTopic("my-topic", 3, (short) 1);
topic.configs(Map.of(
"value.type", "raw",
"storage.compression", "zstd",
"retention.ms", "604800000"
));
admin.createTopics(List.of(topic)).all().get();
}
Available configurations:
-
value.type- How message values are stored (auto,json, orraw). Default:auto -
storage.compression- Compression for values at rest in Redis (none,gzip,snappy,lz4,zstd). Default:lz4 -
compression.type- Compression for Kafka fetch responses (none,gzip,snappy,lz4,zstd) -
offset.sequence.bits- Bits reserved for the per-millisecond sequence component in Korvet offsets. Range:1-16. Default:10. Immutable after topic creation. -
bucket.duration.ms- Duration of each local stream bucket in milliseconds. Optional. Must be positive and less than the effective local retention window. -
retention.ms- Total time-based retention in milliseconds (across all tiers) -
retention.bytes- Total size-based retention in bytes
Tiered storage configurations (when remote storage is enabled at server level):
-
remote.storage.enable- Enable tiered storage for this topic (Kafka KIP-405). Default:false -
local.retention.ms- Time to keep in the local tier before Redis data expires.-2= useretention.ms(Kafka KIP-405) -
local.retention.bytes- Size to keep in the local tier before Redis trimming falls back toretention.bytes.-2= useretention.bytes(Kafka KIP-405) TIP: For large messages or deeply nested JSON, use--config value.type=raw --config storage.compression=zstdto reduce Redis memory usage by up to 51x.
Describing Topics
Get details about a topic:
kafka-topics --bootstrap-server localhost:9092 \
--describe \
--topic my-topic
Deleting Topics
Delete a topic:
kafka-topics --bootstrap-server localhost:9092 \
--delete \
--topic my-topic
Altering Topic Configuration
Topics can be configured with:
-
Partitions: Number of partitions for parallelism (set during creation only)
-
Retention: Time-based (
retention.ms) and size-based (retention.bytes) retention policies -
Value Type: How message values are stored (
value.type:auto,json, orraw) -
Storage Compression: Compression for values at rest in Redis (
storage.compression) -
Protocol Compression: Compression for Kafka fetch responses (
compression.type) -
Offset Encoding: Per-topic offset sequence width (
offset.sequence.bits, create-time only) -
Bucketing: Time-bucketed local streams (
bucket.duration.ms)
Using kafka-configs CLI
Use korvet-cli topics --alter or kafka-configs to alter topic configurations.
korvet-cli topics --bootstrap-server localhost:9092 \
--alter \
--topic my-topic \
--config value.type=raw \
--config storage.compression=zstd \
--config bucket.duration.ms=1800000
kafka-configs --bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name my-topic \
--alter \
--add-config value.type=raw,storage.compression=zstd,compression.type=lz4
offset.sequence.bits cannot be altered after topic creation. kafka-configs --alter can update Korvet-specific keys on an existing topic because it does not reject unknown topic config names client-side.
|
Using AdminClient API
Alternatively, use the AdminClient API:
ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "my-topic");
List<AlterConfigOp> ops = List.of(
new AlterConfigOp(new ConfigEntry("value.type", "raw"), AlterConfigOp.OpType.SET),
new AlterConfigOp(new ConfigEntry("storage.compression", "zstd"), AlterConfigOp.OpType.SET)
);
admin.incrementalAlterConfigs(Map.of(topicResource, ops)).all().get();
Value types:
-
auto- Automatically detect optimal storage: JSON flattening for JSON objects, RAW for everything else (default) -
json- Flatten top-level JSON fields into separate Redis Stream fields -
raw- Store entire value as single field with optional compression
Storage compression types:
-
none- No compression (default) -
gzip- Good compression ratio (26x), slower -
snappy- Fast, moderate compression (10x) -
lz4- Very fast, good compression (14x) -
zstd- Best compression ratio (51x), fast - Recommended
Use value.type=raw with storage.compression=zstd for deeply nested JSON or large messages (>10KB) to reduce Redis memory usage by up to 51x.
|
See Value Mapper Selection for detailed guidance on choosing between JSON flattening and RAW storage.
Describing Topic Configuration
View current topic configuration using korvet-cli topics --describe or kafka-configs --describe:
korvet-cli topics --bootstrap-server localhost:9092 \
--describe \
--topic my-topic
kafka-configs --bootstrap-server localhost:9092 \
--entity-type topics \
--entity-name my-topic \
--describe
Protocol compression types:
-
none- No compression (default) -
gzip- Good compression ratio, higher CPU usage -
snappy- Balanced compression and speed -
lz4- Fast compression, lower CPU usage -
zstd- Best compression ratio, moderate CPU usage
Storage compression and protocol compression are independent. You can use both together (for example, storage.compression=zstd for Redis memory savings and compression.type=lz4 for fast network compression).
|
See Compression for more details on protocol compression.
Tiered Storage Configuration
When tiered storage is enabled at the server level, you can configure per-topic retention policies to control when data moves between tiers.
Configuring Tiered Storage with AdminClient API
Use the AdminClient API to configure tiered storage (since kafka-configs --alter is not supported):
NewTopic topic = new NewTopic("my-topic", 3, (short) 1);
topic.configs(Map.of(
"remote.storage.enable", "true",
"retention.ms", "31536000000", // 1 year total
"local.retention.ms", "86400000" // 1 day in local tier
));
admin.createTopics(List.of(topic)).all().get();
This configures:
-
Local tier: 1 day (
local.retention.ms=86400000) -
Remote tier: ~364 days (implicit:
retention.ms - local.retention.ms) -
Total retention: 1 year (
retention.ms=31536000000)
Tiered Storage Configuration Reference
| Configuration | Default | Description |
|---|---|---|
|
|
Enable tiered storage for this topic (Kafka KIP-405) |
|
|
Time to keep in the local tier before Redis data expires. |
|
|
Size to keep in the local tier before Redis trimming falls back to |
|
|
Total retention across all tiers (7 days default) |
Remote tier retention is implicit and calculated as retention.ms - local.retention.ms. Data is deleted after the total retention.ms period.
|
See Remote Storage for server-level tiered storage configuration.