Topic Management

This guide covers creating and managing topics in Korvet.

Server defaults for new topics are configured via korvet.topics.* in Spring configuration.

Creating Topics

Automatic Topic Creation

By default, topics are automatically created when you first produce to them or request metadata for them. This behavior can be controlled with the korvet.topics.auto-create configuration:

korvet:
  topics:
    auto-create: true  # Enable/disable automatic topic creation (default: true)
    partitions: 1      # Default partitions for auto-created topics (default: 1)

When auto-creation is disabled, you must explicitly create topics before using them.

Explicit Topic Creation

You can create topics explicitly using the Kafka command-line tools:

kafka-topics --bootstrap-server localhost:9092 \
  --create \
  --topic my-topic \
  --partitions 3 \
  --replication-factor 1
The replication-factor parameter is accepted for compatibility but ignored, as Korvet uses Redis for storage and replication.

Creating Topics with Korvet-Specific Configuration

The kafka-topics CLI performs client-side validation and rejects unknown configs like storage.compression and value.type with Unknown topic config name errors.

Solution: Use the two-step approach below, or use the Kafka AdminClient API.

To create topics with Korvet-specific configurations, use a two-step approach:

# Step 1: Create topic with standard Kafka configs
kafka-topics --bootstrap-server localhost:9092 \
  --create \
  --topic my-topic \
  --partitions 3 \
  --config retention.ms=604800000

# Step 2: Add Korvet-specific configs using kafka-configs
kafka-configs --bootstrap-server localhost:9092 \
  --entity-type topics \
  --entity-name my-topic \
  --alter \
  --add-config value.type=raw,storage.compression=zstd

Alternatively, use the Kafka AdminClient API which does not perform client-side validation:

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

try (AdminClient admin = AdminClient.create(props)) {
    NewTopic topic = new NewTopic("my-topic", 3, (short) 1);
    topic.configs(Map.of(
        "value.type", "raw",
        "storage.compression", "zstd",
        "retention.ms", "604800000"
    ));
    admin.createTopics(List.of(topic)).all().get();
}

Available configurations:

  • value.type - How message values are stored (auto, json, or raw). Default: auto

  • storage.compression - Compression for values at rest in Redis (none, gzip, snappy, lz4, zstd). Default: lz4

  • compression.type - Compression for Kafka fetch responses (none, gzip, snappy, lz4, zstd)

  • retention.ms - Total time-based retention in milliseconds (across all tiers)

  • retention.bytes - Total size-based retention in bytes

Tiered storage configurations (when remote storage is enabled at server level):

  • remote.storage.enable - Enable tiered storage for this topic (Kafka KIP-405). Default: false

  • local.retention.ms - Time to keep in local tier (Redis RAM) before archiving. -2 = use retention.ms (Kafka KIP-405)

  • local.retention.bytes - Size to keep in local tier before archiving. -2 = use retention.bytes (Kafka KIP-405)

  • bucket.duration.ms - Time bucket duration in milliseconds for stream bucketing. Default: 3600000 (1 hour). Shorter durations enable faster archival but create more Redis keys (Korvet extension)

For large messages or deeply nested JSON, use --config value.type=raw --config storage.compression=zstd to reduce Redis memory usage by up to 51x.

Listing Topics

List all topics:

kafka-topics --bootstrap-server localhost:9092 --list

Describing Topics

Get details about a topic:

kafka-topics --bootstrap-server localhost:9092 \
  --describe \
  --topic my-topic

Deleting Topics

Delete a topic:

kafka-topics --bootstrap-server localhost:9092 \
  --delete \
  --topic my-topic

Altering Topic Configuration

Topics can be configured with:

  • Partitions: Number of partitions for parallelism (set during creation only)

  • Retention: Time-based (retention.ms) and size-based (retention.bytes) retention policies

  • Value Type: How message values are stored (value.type: auto, json, or raw)

  • Storage Compression: Compression for values at rest in Redis (storage.compression)

  • Protocol Compression: Compression for Kafka fetch responses (compression.type)

Using kafka-configs CLI

Use kafka-configs to alter topic configurations:

kafka-configs --bootstrap-server localhost:9092 \
  --entity-type topics \
  --entity-name my-topic \
  --alter \
  --add-config value.type=raw,storage.compression=zstd,compression.type=lz4

Using AdminClient API

Alternatively, use the AdminClient API:

ConfigResource topicResource = new ConfigResource(ConfigResource.Type.TOPIC, "my-topic");
List<AlterConfigOp> ops = List.of(
    new AlterConfigOp(new ConfigEntry("value.type", "raw"), AlterConfigOp.OpType.SET),
    new AlterConfigOp(new ConfigEntry("storage.compression", "zstd"), AlterConfigOp.OpType.SET)
);
admin.incrementalAlterConfigs(Map.of(topicResource, ops)).all().get();

Value types:

  • auto - Automatically detect optimal storage: JSON flattening for JSON objects, RAW for everything else (default)

  • json - Flatten top-level JSON fields into separate Redis Stream fields

  • raw - Store entire value as single field with optional compression

Storage compression types:

  • none - No compression (default)

  • gzip - Good compression ratio (26x), slower

  • snappy - Fast, moderate compression (10x)

  • lz4 - Very fast, good compression (14x)

  • zstd - Best compression ratio (51x), fast - Recommended

Use value.type=raw with storage.compression=zstd for deeply nested JSON or large messages (>10KB) to reduce Redis memory usage by up to 51x.

See Value Mapper Selection for detailed guidance on choosing between JSON flattening and RAW storage.

Describing Topic Configuration

View current topic configuration using kafka-configs (describe works):

kafka-configs --bootstrap-server localhost:9092 \
  --entity-type topics \
  --entity-name my-topic \
  --describe

Protocol compression types:

  • none - No compression (default)

  • gzip - Good compression ratio, higher CPU usage

  • snappy - Balanced compression and speed

  • lz4 - Fast compression, lower CPU usage

  • zstd - Best compression ratio, moderate CPU usage

Storage compression and protocol compression are independent. You can use both together (e.g., storage.compression=zstd for Redis memory savings and compression.type=lz4 for fast network compression).

See Compression for more details on protocol compression.

Tiered Storage Configuration

When tiered storage is enabled at the server level, you can configure per-topic retention policies to control when data moves between tiers.

Configuring Tiered Storage with AdminClient API

Use the AdminClient API to configure tiered storage (since kafka-configs --alter is not supported):

NewTopic topic = new NewTopic("my-topic", 3, (short) 1);
topic.configs(Map.of(
    "remote.storage.enable", "true",
    "retention.ms", "31536000000",           // 1 year total
    "local.retention.ms", "86400000"         // 1 day in local tier
));
admin.createTopics(List.of(topic)).all().get();

This configures:

  • Local tier: 1 day (local.retention.ms=86400000)

  • Remote tier: ~364 days (implicit: retention.ms - local.retention.ms)

  • Total retention: 1 year (retention.ms=31536000000)

Tiered Storage Configuration Reference

Configuration Default Description

remote.storage.enable

false

Enable tiered storage for this topic (Kafka KIP-405)

local.retention.ms

-2

Time to keep in local tier before archiving. -2 = use total retention.ms

local.retention.bytes

-2

Size to keep in local tier. -2 = use total retention.bytes

retention.ms

604800000

Total retention across all tiers (7 days default)

bucket.duration.ms

3600000

Time bucket duration in milliseconds. Streams are partitioned into time buckets; only sealed (completed) buckets are archived. Shorter durations enable faster archival but create more Redis keys. (Korvet extension)

Remote tier retention is implicit and calculated as retention.ms - local.retention.ms. Data is deleted after the total retention.ms period.

See Remote Storage for server-level tiered storage configuration.