Redis Streams Storage

Korvet uses Redis Streams as its primary storage layer for all messages.

Why Redis Streams?

  • Low latency: Sub-millisecond read/write performance

  • Consumer groups: Built-in support for coordinated consumption

  • Persistence: AOF and RDB for durability

  • Scalability: Handle millions of messages per second

Stream Structure

Each topic partition maps to one or more Redis Streams using time-based bucketing:

  • Stream key format: {keyspace}:stream:{topic}:{partition}:{hashTag}:b{epoch}

    • keyspace: Namespace prefix (default: korvet)

    • topic: Kafka topic name

    • partition: Partition number

    • hashTag: Hash slot tag for Redis Cluster (e.g., {abc})

    • epoch: Bucket timestamp in milliseconds

  • Message ID: Redis-generated timestamp-based ID (format: timestamp-sequence)

  • Fields: Kafka record components mapped to stream fields

Time Bucketing

Messages are distributed across time-based buckets (default: 1 hour). This enables:

  • Efficient retention: Delete entire buckets when data expires

  • Archival: Move old buckets to cold storage without blocking reads

  • Index optimization: Segment index tracks bucket boundaries

Message Format

Kafka records are decomposed into Redis Stream fields:

  • Key: Stored in __key field (if present)

  • Headers: Stored as __header.{name} fields (e.g., __header.content-type)

  • Value: Encoding depends on detected value type:

    • JSON: Top-level fields are flattened into separate stream fields

    • Raw bytes: Stored as single value field

JSON Value Example

Example: JSON message with flattened fields
# Kafka record with JSON value
Key: "user123"
Value: {"name": "Alice", "age": 30}
Headers: content-type=application/json

# Stored in Redis Stream as (bucket key includes hash tag and epoch):
XADD korvet:stream:my-topic:0:{abc}:b1741852800000 * \
  __key "user123" \
  __header.content-type "application/json" \
  name "\"Alice\"" \
  age "30"
JSON values are flattened - each top-level JSON field becomes a separate stream field. Nested objects and arrays are JSON-encoded as byte arrays.

Raw Bytes Value Example

Example: Non-JSON message
# Kafka record with raw bytes
Key: "order456"
Value: <binary data>
Headers: content-type=application/octet-stream

# Stored in Redis Stream as:
XADD korvet:stream:my-topic:0:{abc}:b1741852800000 * \
  __key "order456" \
  __header.content-type "application/octet-stream" \
  value <binary data>

Value Type Detection

Korvet automatically detects the most storage-efficient value type when the first message is produced to a topic. The detection algorithm accounts for Redis Streams' internal delta compression to make optimal choices:

Decision logic:

  • Small messages (<1 KB): Uses JSON flattening

    • Benefits from Redis delta compression (field names stored once per ~100 entries)

    • Achieves negative overhead (-19% to -36%) compared to RAW storage

  • Large messages (≥10 KB) with compression enabled: Uses RAW storage

    • Compression provides significant reduction (up to 51x with ZSTD)

    • Avoids JSON parsing overhead

  • Medium messages (1-10 KB): Compares amortized storage sizes

    • Calculates JSON storage cost accounting for delta compression

    • Calculates RAW storage cost with compression

    • Chooses the more efficient option

  • Non-JSON data: Always uses RAW storage

    • Includes JSON arrays, primitives, and binary data

The detected type is stored in the topic configuration and used for all subsequent messages.

Value Mapper Selection

Choosing between JSON flattening and RAW storage with compression significantly impacts memory usage and performance.

JSON Flattening (Default for JSON Objects)

How it works:

  • Top-level JSON fields are extracted and stored as separate Redis Stream fields

  • Nested objects and arrays are JSON-encoded as individual field values

  • Leverages Redis Streams' listpack encoding for efficient storage

Memory characteristics:

  • Efficient for shallow JSON with many small top-level fields

  • Redis listpack encoding optimizes storage of multiple small values

  • No compression overhead - relies on Redis internal optimizations

Best for:

  • ✓ Shallow JSON objects (1-2 levels deep) with 10+ top-level fields

  • ✓ Small messages (<1KB) where compression overhead dominates

  • ✓ Varied field values (UUIDs, timestamps, user data)

  • ✓ When you need field-level access in Redis (queries, filtering)

  • ✓ Random or already-compressed data (images, encrypted data)

Example use cases:

  • User profiles: {"id": 123, "name": "Alice", "email": "…​", "age": 30, …​}

  • Order records with many top-level attributes

  • Event data with flat structure

RAW Storage with Compression

How it works:

  • Entire value stored as single __value field

  • Optional compression (GZIP, SNAPPY, LZ4, ZSTD) applied to the field

  • Compression is transparent - automatically decompressed on read

Memory characteristics:

  • ZSTD achieves up to 51x compression on repetitive data

  • Compression overhead: ~5-10 bytes per message + CPU cost

  • Most effective on large messages (>10KB)

Best for:

  • ✓ Deeply nested JSON (>2-3 levels) that can’t be flattened

  • ✓ Large messages (>10KB) where compression overhead is amortized

  • ✓ Repetitive data patterns (logs, telemetry, structured text)

  • ✓ JSON with large nested arrays or objects

  • ✓ Binary formats with patterns (Protocol Buffers, Avro)

Example use cases:

  • Log messages with repeated structure

  • Telemetry data with nested metrics

  • Large JSON documents with deep nesting

  • Event streams with repetitive patterns

Performance Comparison

Based on production testing with 1000 messages × 100KB:

Scenario JSON Flattened RAW+ZSTD Winner

Shallow JSON (many fields)

49.98 MB

97.76 MB

JSON (2x better)

Deep JSON (10 levels)

16.81 MB

0.93 MB

RAW+ZSTD (18x better)

Wide JSON (100 fields)

12.17 MB

0.93 MB

RAW+ZSTD (13x better)

Configuring Storage Compression

Storage compression is configured per-topic and applies to the __value field for RAW storage:

Example: Create topic with storage compression
NewTopic topic = new NewTopic("logs-topic", 3, (short) 1);
topic.configs(Map.of(
    "value.type", "raw",              // Use RAW storage
    "storage.compression", "zstd"     // Compress with ZSTD
));
admin.createTopics(List.of(topic));

Compression types:

  • none - No compression (default)

  • gzip - Good compression ratio, slower (26x compression)

  • snappy - Fast, moderate compression (10x compression)

  • lz4 - Very fast, good compression (14x compression)

  • zstd - Best compression ratio, fast (51x compression) - Recommended

Storage compression is independent of Kafka protocol compression (compression.type). Storage compression applies to data at rest in Redis, while protocol compression applies to data in transit between Kafka clients and Korvet.

Decision Guide

Use this flowchart to choose the right value mapper:

Is your data JSON?
├─ No → Use RAW + ZSTD compression
└─ Yes
   ├─ Is it deeply nested (>2 levels)?
   │  └─ Yes → Use RAW + ZSTD compression
   └─ No
      ├─ Does it have many top-level fields (>10)?
      │  └─ Yes → Use JSON flattening
      └─ No
         ├─ Is message size large (>10KB)?
         │  └─ Yes → Use RAW + ZSTD compression
         └─ No → Use JSON flattening
When in doubt, use RAW + ZSTD for large messages (>10KB) and JSON flattening for small messages (<1KB). ZSTD compression is very effective on most data types.

Retention Policies

Korvet enforces retention policies at write time using Redis Stream’s MAXLEN and MINID arguments:

  • retention.ms: Time-based retention (default: 7 days)

  • retention.bytes: Size-based retention (default: unlimited)

  • compression.type: Compression type for fetch responses (none, gzip, snappy, lz4, zstd)

How Retention Works

When producing messages, Korvet applies retention policies using XADD arguments:

  • Time-based: MINID argument trims messages older than retention.ms

  • Size-based: MAXLEN argument limits stream to message count based on retention.bytes

# Example: XADD with exact retention (default)
XADD korvet:stream:my-topic:0 MAXLEN 1000 MINID 1234567890000-0 * __key "..." value "..."

# Example: XADD with approximate retention (when approximate-trimming=true)
XADD korvet:stream:my-topic:0 MAXLEN ~ 1000 MINID 1234567890000-0 * __key "..." value "..."
  • Retention is enforced at produce time, not as a background process

  • By default, MAXLEN uses exact trimming for predictable retention behavior

  • Set approximate.trimming=true per-topic to use approximate trimming (~) for better performance at the cost of retention precision

  • Size-based retention calculates message count from average message size

Configuring Retention

Set retention when creating topics via Kafka Admin API:

Example: Create topic with retention configuration
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");

AdminClient admin = AdminClient.create(props);
NewTopic topic = new NewTopic("my-topic", 3, (short) 1);
topic.configs(Map.of(
    "retention.ms", "86400000",      // 1 day
    "retention.bytes", "1073741824", // 1 GB
    "compression.type", "lz4"        // Compress fetch responses with LZ4
));
admin.createTopics(List.of(topic));

Or configure defaults in application.yml:

korvet:
  topics:
    retention-time: 7d
    retention-bytes: 10GB
    compression: lz4

Manual Trimming

For manual stream management, use Redis’s XTRIM command:

# Trim by count (keep last 1000 messages)
XTRIM korvet:stream:my-topic:0 MAXLEN ~ 1000

# Trim by age (keep messages from last hour)
XTRIM korvet:stream:my-topic:0 MINID <timestamp-in-ms>-0

Performance Tuning

Async Operations

Korvet uses asynchronous Redis operations for maximum throughput:

  • All Redis commands use Lettuce’s async API (RedisFuture)

  • Operations return CompletableFuture to avoid blocking

  • Multiple operations execute in parallel

  • Netty event loop threads remain non-blocking

Benefits: * Higher throughput with fewer threads * Better resource utilization * Reduced latency under load

Pipelining

Korvet automatically batches Redis operations using Lettuce’s command pipelining:

  • Multiple commands are batched together

  • setAutoFlushCommands(false) delays command execution

  • flushCommands() sends all commands in a single network round-trip

  • Significantly improves throughput for high-volume producers

Example: Producing 1000 messages sends 1000 XADD commands in a single pipeline instead of 1000 round-trips. * Configurable pool size based on workload

Compression Types

Korvet supports two types of compression:

1. Storage Compression (New)

Compresses individual message values at rest in Redis Streams:

  • Where: Applied to the __value field in Redis Streams

  • When: At write time (produce) and read time (fetch)

  • Configuration: storage.compression topic config (none, gzip, snappy, lz4, zstd)

  • Use case: Reduce Redis memory usage for large or repetitive data

How it works:

  • Producer writes: Value is compressed before storing in Redis

  • Consumer reads: Value is decompressed when fetching from Redis

  • Transparent to Kafka clients - they receive uncompressed data

  • Independent of Kafka protocol compression

Benefits:

  • Reduces Redis memory usage by up to 51x (ZSTD)

  • Lower storage costs

  • Faster Redis persistence (smaller AOF/RDB files)

  • No impact on Kafka client compatibility

See Value Mapper Selection for guidance on when to use storage compression.

2. Protocol Compression (Kafka Standard)

Compresses Kafka protocol messages between clients and Korvet:

  • Where: Applied to Kafka Fetch/Produce request/response payloads

  • When: During network transmission

  • Configuration: compression.type topic config (none, gzip, snappy, lz4, zstd)

  • Use case: Reduce network bandwidth between Kafka clients and Korvet

How it works:

  • Producer side: Kafka clients can send compressed or uncompressed batches; Korvet decompresses them before storing

  • Consumer side: Korvet compresses fetch responses based on topic’s compression.type configuration

  • Storage: Messages are stored uncompressed in Redis (unless storage compression is enabled)

Benefits:

  • Reduces network bandwidth for fetch responses

  • Transparent to clients - works with all Kafka clients

  • Flexible per-topic configuration

  • Standard Kafka feature

Compression Comparison

Feature Storage Compression Protocol Compression

Purpose

Reduce Redis memory usage

Reduce network bandwidth

Applied at

Redis storage layer

Kafka protocol layer

Configuration

storage.compression

compression.type

Affects

Redis memory, persistence

Network traffic

Transparent to

Kafka clients

Storage layer

Recommended for

Large messages, repetitive data

High-throughput consumers

You can use both types of compression together! For example, use storage.compression=zstd to save Redis memory and compression.type=lz4 for fast network compression.

See Compression for protocol compression configuration details.