Redis Streams (Hot Storage)

Korvet uses Redis Streams as the hot storage tier for recent messages.

Why Redis Streams?

  • Low latency: Sub-millisecond read/write performance

  • Consumer groups: Built-in support for coordinated consumption

  • Persistence: AOF and RDB for durability

  • Scalability: Handle millions of messages per second

Stream Structure

Each topic partition maps to a Redis Stream:

  • Stream key: {keyspace}:{topic}:{partition} (default tenant: default)

  • Message ID: Redis-generated timestamp-based ID (format: timestamp-sequence)

  • Fields: Kafka record components mapped to stream fields

Message Format

Kafka records are decomposed into Redis Stream fields:

  • Key: Stored in __key field (if present)

  • Headers: Stored as __header.{name} fields (e.g., __header.content-type)

  • Value: JSON values are flattened - each top-level JSON field becomes a stream field

Example: Simple JSON message
# Kafka record with JSON value
Key: "user123"
Value: {"name": "Alice", "age": 30}
Headers: content-type=application/json

# Stored in Redis Stream as:
XADD default:my-topic:0 * \
  __key "user123" \
  __header.content-type "application/json" \
  name "\"Alice\"" \
  age "30"
All values are JSON-encoded as byte arrays. The value {"name": "Alice", "age": 30} is flattened into separate fields name and age, each JSON-encoded.

Retention Policies

Retention policy enforcement is currently in development. Topic configurations can be set via Kafka’s CreateTopics API, but they are not yet enforced on Redis Streams.

Topic retention settings are stored but not yet enforced:

  • retention.ms: Time-based retention (default: 7 days) - not enforced

  • retention.bytes: Size-based retention (default: unlimited) - not enforced

  • compression.type: Compression type metadata (none, gzip, snappy, lz4, zstd) - stored only

Example: Create topic with retention configuration
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");

AdminClient admin = AdminClient.create(props);
NewTopic topic = new NewTopic("my-topic", 3, (short) 1);
topic.configs(Map.of(
    "retention.ms", "86400000",      // 1 day (stored but not enforced)
    "retention.bytes", "1073741824", // 1 GB (stored but not enforced)
    "compression.type", "lz4"        // Metadata only
));
admin.createTopics(List.of(topic));

To manually manage Redis Stream retention, use Redis’s XTRIM command:

# Trim by count (keep last 1000 messages)
XTRIM default:my-topic:0 MAXLEN ~ 1000

# Trim by age (keep messages from last hour)
XTRIM default:my-topic:0 MINID <timestamp-in-ms>-0

Performance Tuning

Pipelining

Korvet automatically batches XADD operations using Lettuce’s command pipelining:

  • Multiple produce requests are batched together

  • setAutoFlushCommands(false) delays command execution

  • flushCommands() sends all commands in a single network round-trip

  • Significantly improves throughput for high-volume producers

Connection Pooling

Korvet uses Apache Commons Pool for Redis connection management:

korvet:
  redis:
    pool:
      max-active: 10      # Maximum connections
      max-idle: 10        # Maximum idle connections
      min-idle: 2         # Minimum idle connections
      max-wait: -1ms      # Wait time when pool exhausted (-1 = indefinite)
      time-between-eviction-runs: 60s  # Idle connection cleanup interval

Benefits: * Reuses connections across requests * Reduces connection overhead * Configurable pool size based on workload

Compression

Message compression is not currently implemented in Korvet. Messages are stored uncompressed in Redis Streams.

The compression.type topic configuration is stored as metadata but not used:

  • Kafka clients can compress messages before sending (client-side compression)

  • Korvet stores the compressed bytes as-is in Redis

  • Consumers receive the same compressed bytes and decompress client-side

  • No server-side compression/decompression is performed