For the latest stable version, please use Korvet 0.12.5!

Redis Streams Storage

Korvet uses Redis Streams as its primary storage layer for all messages.

Why Redis Streams?

  • Low latency: Sub-millisecond read/write performance

  • Consumer groups: Built-in support for coordinated consumption

  • Persistence: AOF and RDB for durability

  • Scalability: Handle millions of messages per second

Stream Structure

Each topic partition maps to a Redis Stream:

  • Stream key: {keyspace}:stream:{topic}:{partition} (default keyspace: korvet)

  • Message ID: Redis-generated timestamp-based ID (format: timestamp-sequence)

  • Fields: Kafka record components mapped to stream fields

Message Format

Kafka records are decomposed into Redis Stream fields:

  • Key: Stored in __key field (if present)

  • Headers: Stored as __header.{name} fields (e.g., __header.content-type)

  • Value: Encoding depends on detected value type:

    • JSON: Top-level fields are flattened into separate stream fields

    • Raw bytes: Stored as single value field

JSON Value Example

Example: JSON message with flattened fields
# Kafka record with JSON value
Key: "user123"
Value: {"name": "Alice", "age": 30}
Headers: content-type=application/json

# Stored in Redis Stream as:
XADD korvet:stream:my-topic:0 * \
  __key "user123" \
  __header.content-type "application/json" \
  name "\"Alice\"" \
  age "30"
JSON values are flattened - each top-level JSON field becomes a separate stream field. Nested objects and arrays are JSON-encoded as byte arrays.

Raw Bytes Value Example

Example: Non-JSON message
# Kafka record with raw bytes
Key: "order456"
Value: <binary data>
Headers: content-type=application/octet-stream

# Stored in Redis Stream as:
XADD korvet:my-topic:0 * \
  __key "order456" \
  __header.content-type "application/octet-stream" \
  value <binary data>

Value Type Detection

Korvet automatically detects the value type when the first message is produced to a topic:

  • JSON: If the value is valid JSON (starts with { or [)

  • Raw bytes: Otherwise

The detected type is stored in the topic configuration and used for all subsequent messages.

Retention Policies

Korvet enforces retention policies at write time using Redis Stream’s MAXLEN and MINID arguments:

  • retention.ms: Time-based retention (default: 7 days)

  • retention.bytes: Size-based retention (default: unlimited)

  • compression.type: Compression type for fetch responses (none, gzip, snappy, lz4, zstd)

How Retention Works

When producing messages, Korvet applies retention policies using XADD arguments:

  • Time-based: MINID argument trims messages older than retention.ms

  • Size-based: MAXLEN argument limits stream to approximate message count based on retention.bytes

# Example: XADD with retention
XADD korvet:stream:my-topic:0 MAXLEN ~ 1000 MINID 1234567890000-0 * __key "..." value "..."
  • Retention is enforced at produce time, not as a background process

  • MAXLEN uses approximate trimming (~) for better performance

  • Size-based retention calculates message count from average message size

Configuring Retention

Set retention when creating topics via Kafka Admin API:

Example: Create topic with retention configuration
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");

AdminClient admin = AdminClient.create(props);
NewTopic topic = new NewTopic("my-topic", 3, (short) 1);
topic.configs(Map.of(
    "retention.ms", "86400000",      // 1 day
    "retention.bytes", "1073741824", // 1 GB
    "compression.type", "lz4"        // Compress fetch responses with LZ4
));
admin.createTopics(List.of(topic));

Or configure defaults in application.yml:

korvet:
  server:
    default-retention-time: 7d
    default-retention-size: 10GB
    default-compression: lz4

Manual Trimming

For manual stream management, use Redis’s XTRIM command:

# Trim by count (keep last 1000 messages)
XTRIM korvet:stream:my-topic:0 MAXLEN ~ 1000

# Trim by age (keep messages from last hour)
XTRIM korvet:stream:my-topic:0 MINID <timestamp-in-ms>-0

Performance Tuning

Async Operations

Korvet uses asynchronous Redis operations for maximum throughput:

  • All Redis commands use Lettuce’s async API (RedisFuture)

  • Operations return CompletableFuture to avoid blocking

  • Multiple operations execute in parallel

  • Netty event loop threads remain non-blocking

Benefits: * Higher throughput with fewer threads * Better resource utilization * Reduced latency under load

Pipelining

Korvet automatically batches Redis operations using Lettuce’s command pipelining:

  • Multiple commands are batched together

  • setAutoFlushCommands(false) delays command execution

  • flushCommands() sends all commands in a single network round-trip

  • Significantly improves throughput for high-volume producers

Example: Producing 1000 messages sends 1000 XADD commands in a single pipeline instead of 1000 round-trips.

Connection Pooling

Korvet uses Apache Commons Pool for Redis connection management:

korvet:
  redis:
    pool:
      max-active: 10      # Maximum connections
      max-idle: 10        # Maximum idle connections
      min-idle: 2         # Minimum idle connections
      max-wait: -1ms      # Wait time when pool exhausted (-1 = indefinite)
      time-between-eviction-runs: 60s  # Idle connection cleanup interval

Benefits: * Reuses connections across requests * Reduces connection overhead * Configurable pool size based on workload

Compression

Korvet implements server-side compression to reduce network bandwidth between the server and consumers:

Storage: * Messages are always stored uncompressed in Redis Streams * This enables better queryability and avoids decompression overhead in the storage layer * Each record is stored as individual fields in the stream

Producer side: * Kafka clients can send compressed or uncompressed batches * Korvet automatically decompresses incoming batches using Kafka’s API * Individual records are extracted and stored uncompressed in Redis

Consumer side: * When consumers fetch messages, Korvet compresses the response based on the topic’s compression.type configuration * Compression is applied per-topic, not per-producer * Supported types: NONE, GZIP, SNAPPY, LZ4, ZSTD

Benefits: * Reduces network bandwidth for fetch responses * Transparent to clients - works with all Kafka clients * Flexible per-topic configuration * No impact on storage layer performance

See Compression for configuration details.