Redis Streams Storage
Korvet uses Redis Streams as its primary storage layer for all messages.
Why Redis Streams?
-
Low latency: Sub-millisecond read/write performance
-
Consumer groups: Built-in support for coordinated consumption
-
Persistence: AOF and RDB for durability
-
Scalability: Handle millions of messages per second
Stream Structure
Each topic partition maps to one or more Redis Streams using time-based bucketing:
-
Stream key format:
{keyspace}:stream:{topic}:{partition}:{hashTag}:b{epoch}-
keyspace: Namespace prefix (default:korvet) -
topic: Kafka topic name -
partition: Partition number -
hashTag: Hash slot tag for Redis Cluster (e.g.,{abc}) -
epoch: Bucket timestamp in milliseconds
-
-
Message ID: Redis-generated timestamp-based ID (format:
timestamp-sequence) -
Fields: Kafka record components mapped to stream fields
Message Format
Kafka records are decomposed into Redis Stream fields:
-
Key: Stored in
__keyfield (if present) -
Headers: Stored as
__header.{name}fields (e.g.,__header.content-type) -
Value: Encoding depends on detected value type:
-
JSON: Top-level fields are flattened into separate stream fields
-
Raw bytes: Stored as single
valuefield
-
JSON Value Example
# Kafka record with JSON value
Key: "user123"
Value: {"name": "Alice", "age": 30}
Headers: content-type=application/json
# Stored in Redis Stream as (bucket key includes hash tag and epoch):
XADD korvet:stream:my-topic:0:{abc}:b1741852800000 * \
__key "user123" \
__header.content-type "application/json" \
name "\"Alice\"" \
age "30"
| JSON values are flattened - each top-level JSON field becomes a separate stream field. Nested objects and arrays are JSON-encoded as byte arrays. |
Raw Bytes Value Example
# Kafka record with raw bytes
Key: "order456"
Value: <binary data>
Headers: content-type=application/octet-stream
# Stored in Redis Stream as:
XADD korvet:stream:my-topic:0:{abc}:b1741852800000 * \
__key "order456" \
__header.content-type "application/octet-stream" \
value <binary data>
Value Type Detection
Korvet automatically detects the most storage-efficient value type when the first message is produced to a topic. The detection algorithm accounts for Redis Streams' internal delta compression to make optimal choices:
Decision logic:
-
Small messages (<1 KB): Uses JSON flattening
-
Benefits from Redis delta compression (field names stored once per ~100 entries)
-
Achieves negative overhead (-19% to -36%) compared to RAW storage
-
-
Large messages (≥10 KB) with compression enabled: Uses RAW storage
-
Compression provides significant reduction (up to 51x with ZSTD)
-
Avoids JSON parsing overhead
-
-
Medium messages (1-10 KB): Compares amortized storage sizes
-
Calculates JSON storage cost accounting for delta compression
-
Calculates RAW storage cost with compression
-
Chooses the more efficient option
-
-
Non-JSON data: Always uses RAW storage
-
Includes JSON arrays, primitives, and binary data
-
The detected type is stored in the topic configuration and used for all subsequent messages.
Value Mapper Selection
Choosing between JSON flattening and RAW storage with compression significantly impacts memory usage and performance.
JSON Flattening (Default for JSON Objects)
How it works:
-
Top-level JSON fields are extracted and stored as separate Redis Stream fields
-
Nested objects and arrays are JSON-encoded as individual field values
-
Leverages Redis Streams' listpack encoding for efficient storage
Memory characteristics:
-
Efficient for shallow JSON with many small top-level fields
-
Redis listpack encoding optimizes storage of multiple small values
-
No compression overhead - relies on Redis internal optimizations
Best for:
-
✓ Shallow JSON objects (1-2 levels deep) with 10+ top-level fields
-
✓ Small messages (<1KB) where compression overhead dominates
-
✓ Varied field values (UUIDs, timestamps, user data)
-
✓ When you need field-level access in Redis (queries, filtering)
-
✓ Random or already-compressed data (images, encrypted data)
Example use cases:
-
User profiles:
{"id": 123, "name": "Alice", "email": "…", "age": 30, …} -
Order records with many top-level attributes
-
Event data with flat structure
RAW Storage with Compression
How it works:
-
Entire value stored as single
__valuefield -
Optional compression (GZIP, SNAPPY, LZ4, ZSTD) applied to the field
-
Compression is transparent - automatically decompressed on read
Memory characteristics:
-
ZSTD achieves up to 51x compression on repetitive data
-
Compression overhead: ~5-10 bytes per message + CPU cost
-
Most effective on large messages (>10KB)
Best for:
-
✓ Deeply nested JSON (>2-3 levels) that can’t be flattened
-
✓ Large messages (>10KB) where compression overhead is amortized
-
✓ Repetitive data patterns (logs, telemetry, structured text)
-
✓ JSON with large nested arrays or objects
-
✓ Binary formats with patterns (Protocol Buffers, Avro)
Example use cases:
-
Log messages with repeated structure
-
Telemetry data with nested metrics
-
Large JSON documents with deep nesting
-
Event streams with repetitive patterns
Performance Comparison
Based on production testing with 1000 messages × 100KB:
| Scenario | JSON Flattened | RAW+ZSTD | Winner |
|---|---|---|---|
Shallow JSON (many fields) |
49.98 MB |
97.76 MB |
JSON (2x better) |
Deep JSON (10 levels) |
16.81 MB |
0.93 MB |
RAW+ZSTD (18x better) |
Wide JSON (100 fields) |
12.17 MB |
0.93 MB |
RAW+ZSTD (13x better) |
Configuring Storage Compression
Storage compression is configured per-topic and applies to the __value field for RAW storage:
NewTopic topic = new NewTopic("logs-topic", 3, (short) 1);
topic.configs(Map.of(
"value.type", "raw", // Use RAW storage
"storage.compression", "zstd" // Compress with ZSTD
));
admin.createTopics(List.of(topic));
Compression types:
-
none- No compression (default) -
gzip- Good compression ratio, slower (26x compression) -
snappy- Fast, moderate compression (10x compression) -
lz4- Very fast, good compression (14x compression) -
zstd- Best compression ratio, fast (51x compression) - Recommended
Storage compression is independent of Kafka protocol compression (compression.type). Storage compression applies to data at rest in Redis, while protocol compression applies to data in transit between Kafka clients and Korvet.
|
Decision Guide
Use this flowchart to choose the right value mapper:
Is your data JSON?
├─ No → Use RAW + ZSTD compression
└─ Yes
├─ Is it deeply nested (>2 levels)?
│ └─ Yes → Use RAW + ZSTD compression
└─ No
├─ Does it have many top-level fields (>10)?
│ └─ Yes → Use JSON flattening
└─ No
├─ Is message size large (>10KB)?
│ └─ Yes → Use RAW + ZSTD compression
└─ No → Use JSON flattening
| When in doubt, use RAW + ZSTD for large messages (>10KB) and JSON flattening for small messages (<1KB). ZSTD compression is very effective on most data types. |
Retention Policies
Korvet enforces retention policies at write time using Redis Stream’s MAXLEN and MINID arguments:
-
retention.ms: Time-based retention (default: 7 days) -
retention.bytes: Size-based retention (default: unlimited) -
compression.type: Compression type for fetch responses (none, gzip, snappy, lz4, zstd)
How Retention Works
When producing messages, Korvet applies retention policies using XADD arguments:
-
Time-based:
MINIDargument trims messages older thanretention.ms -
Size-based:
MAXLENargument limits stream to message count based onretention.bytes
# Example: XADD with exact retention (default)
XADD korvet:stream:my-topic:0 MAXLEN 1000 MINID 1234567890000-0 * __key "..." value "..."
# Example: XADD with approximate retention (when approximate-trimming=true)
XADD korvet:stream:my-topic:0 MAXLEN ~ 1000 MINID 1234567890000-0 * __key "..." value "..."
|
Configuring Retention
Set retention when creating topics via Kafka Admin API:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
AdminClient admin = AdminClient.create(props);
NewTopic topic = new NewTopic("my-topic", 3, (short) 1);
topic.configs(Map.of(
"retention.ms", "86400000", // 1 day
"retention.bytes", "1073741824", // 1 GB
"compression.type", "lz4" // Compress fetch responses with LZ4
));
admin.createTopics(List.of(topic));
Or configure defaults in application.yml:
korvet:
topics:
retention-time: 7d
retention-bytes: 10GB
compression: lz4
Performance Tuning
Async Operations
Korvet uses asynchronous Redis operations for maximum throughput:
-
All Redis commands use Lettuce’s async API (
RedisFuture) -
Operations return
CompletableFutureto avoid blocking -
Multiple operations execute in parallel
-
Netty event loop threads remain non-blocking
Benefits: * Higher throughput with fewer threads * Better resource utilization * Reduced latency under load
Pipelining
Korvet automatically batches Redis operations using Lettuce’s command pipelining:
-
Multiple commands are batched together
-
setAutoFlushCommands(false)delays command execution -
flushCommands()sends all commands in a single network round-trip -
Significantly improves throughput for high-volume producers
Example: Producing 1000 messages sends 1000 XADD commands in a single pipeline instead of 1000 round-trips.
* Configurable pool size based on workload
Compression Types
Korvet supports two types of compression:
1. Storage Compression (New)
Compresses individual message values at rest in Redis Streams:
-
Where: Applied to the
__valuefield in Redis Streams -
When: At write time (produce) and read time (fetch)
-
Configuration:
storage.compressiontopic config (none, gzip, snappy, lz4, zstd) -
Use case: Reduce Redis memory usage for large or repetitive data
How it works:
-
Producer writes: Value is compressed before storing in Redis
-
Consumer reads: Value is decompressed when fetching from Redis
-
Transparent to Kafka clients - they receive uncompressed data
-
Independent of Kafka protocol compression
Benefits:
-
Reduces Redis memory usage by up to 51x (ZSTD)
-
Lower storage costs
-
Faster Redis persistence (smaller AOF/RDB files)
-
No impact on Kafka client compatibility
See Value Mapper Selection for guidance on when to use storage compression.
2. Protocol Compression (Kafka Standard)
Compresses Kafka protocol messages between clients and Korvet:
-
Where: Applied to Kafka Fetch/Produce request/response payloads
-
When: During network transmission
-
Configuration:
compression.typetopic config (none, gzip, snappy, lz4, zstd) -
Use case: Reduce network bandwidth between Kafka clients and Korvet
How it works:
-
Producer side: Kafka clients can send compressed or uncompressed batches; Korvet decompresses them before storing
-
Consumer side: Korvet compresses fetch responses based on topic’s
compression.typeconfiguration -
Storage: Messages are stored uncompressed in Redis (unless storage compression is enabled)
Benefits:
-
Reduces network bandwidth for fetch responses
-
Transparent to clients - works with all Kafka clients
-
Flexible per-topic configuration
-
Standard Kafka feature
Compression Comparison
| Feature | Storage Compression | Protocol Compression |
|---|---|---|
Purpose |
Reduce Redis memory usage |
Reduce network bandwidth |
Applied at |
Redis storage layer |
Kafka protocol layer |
Configuration |
|
|
Affects |
Redis memory, persistence |
Network traffic |
Transparent to |
Kafka clients |
Storage layer |
Recommended for |
Large messages, repetitive data |
High-throughput consumers |
You can use both types of compression together! For example, use storage.compression=zstd to save Redis memory and compression.type=lz4 for fast network compression.
|
See Compression for protocol compression configuration details.