For the latest stable version, please use Korvet 0.12.5!

Redis Streams (Hot Storage)

Korvet uses Redis Streams as the hot storage tier for recent messages.

Why Redis Streams?

  • Low latency: Sub-millisecond read/write performance

  • Consumer groups: Built-in support for coordinated consumption

  • Persistence: AOF and RDB for durability

  • Scalability: Handle millions of messages per second

Stream Structure

Each topic partition maps to a Redis Stream:

  • Stream key: {keyspace}:{topic}:{partition} (default keyspace: korvet)

  • Message ID: Redis-generated timestamp-based ID (format: timestamp-sequence)

  • Fields: Kafka record components mapped to stream fields

Message Format

Kafka records are decomposed into Redis Stream fields:

  • Key: Stored in __key field (if present)

  • Headers: Stored as __header.{name} fields (e.g., __header.content-type)

  • Value: JSON values are flattened - each top-level JSON field becomes a stream field

Example: Simple JSON message
# Kafka record with JSON value
Key: "user123"
Value: {"name": "Alice", "age": 30}
Headers: content-type=application/json

# Stored in Redis Stream as:
XADD default:my-topic:0 * \
  __key "user123" \
  __header.content-type "application/json" \
  name "\"Alice\"" \
  age "30"
All values are JSON-encoded as byte arrays. The value {"name": "Alice", "age": 30} is flattened into separate fields name and age, each JSON-encoded.

Retention Policies

Retention policy enforcement is currently in development. Topic configurations can be set via Kafka’s CreateTopics API, but they are not yet enforced on Redis Streams.

Topic retention settings are stored but not yet enforced:

  • retention.ms: Time-based retention (default: 7 days) - not enforced

  • retention.bytes: Size-based retention (default: unlimited) - not enforced

  • compression.type: Compression type for fetch responses (none, gzip, snappy, lz4, zstd)

Example: Create topic with retention configuration
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");

AdminClient admin = AdminClient.create(props);
NewTopic topic = new NewTopic("my-topic", 3, (short) 1);
topic.configs(Map.of(
    "retention.ms", "86400000",      // 1 day (stored but not enforced)
    "retention.bytes", "1073741824", // 1 GB (stored but not enforced)
    "compression.type", "lz4"        // Compress fetch responses with LZ4
));
admin.createTopics(List.of(topic));

To manually manage Redis Stream retention, use Redis’s XTRIM command:

# Trim by count (keep last 1000 messages)
XTRIM default:my-topic:0 MAXLEN ~ 1000

# Trim by age (keep messages from last hour)
XTRIM default:my-topic:0 MINID <timestamp-in-ms>-0

Performance Tuning

Pipelining

Korvet automatically batches XADD operations using Lettuce’s command pipelining:

  • Multiple produce requests are batched together

  • setAutoFlushCommands(false) delays command execution

  • flushCommands() sends all commands in a single network round-trip

  • Significantly improves throughput for high-volume producers

Connection Pooling

Korvet uses Apache Commons Pool for Redis connection management:

korvet:
  redis:
    pool:
      max-active: 10      # Maximum connections
      max-idle: 10        # Maximum idle connections
      min-idle: 2         # Minimum idle connections
      max-wait: -1ms      # Wait time when pool exhausted (-1 = indefinite)
      time-between-eviction-runs: 60s  # Idle connection cleanup interval

Benefits: * Reuses connections across requests * Reduces connection overhead * Configurable pool size based on workload

Compression

Korvet implements server-side compression to reduce network bandwidth between the server and consumers:

Storage: * Messages are always stored uncompressed in Redis Streams * This enables better queryability and avoids decompression overhead in the storage layer * Each record is stored as individual fields in the stream

Producer side: * Kafka clients can send compressed or uncompressed batches * Korvet automatically decompresses incoming batches using Kafka’s API * Individual records are extracted and stored uncompressed in Redis

Consumer side: * When consumers fetch messages, Korvet compresses the response based on the topic’s compression.type configuration * Compression is applied per-topic, not per-producer * Supported types: NONE, GZIP, SNAPPY, LZ4, ZSTD

Benefits: * Reduces network bandwidth for fetch responses * Transparent to clients - works with all Kafka clients * Flexible per-topic configuration * No impact on storage layer performance

See Compression for configuration details.