|
For the latest stable version, please use Korvet 0.12.5! |
Redis Streams Storage
Korvet uses Redis Streams as its primary storage layer for all messages.
Why Redis Streams?
-
Low latency: Sub-millisecond read/write performance
-
Consumer groups: Built-in support for coordinated consumption
-
Persistence: AOF and RDB for durability
-
Scalability: Handle millions of messages per second
Stream Structure
Each topic partition maps to a Redis Stream:
-
Stream key:
{keyspace}:stream:{topic}:{partition}(default keyspace:korvet) -
Message ID: Redis-generated timestamp-based ID (format:
timestamp-sequence) -
Fields: Kafka record components mapped to stream fields
Message Format
Kafka records are decomposed into Redis Stream fields:
-
Key: Stored in
__keyfield (if present) -
Headers: Stored as
__header.{name}fields (e.g.,__header.content-type) -
Value: Encoding depends on detected value type:
-
JSON: Top-level fields are flattened into separate stream fields
-
Raw bytes: Stored as single
valuefield
-
JSON Value Example
# Kafka record with JSON value
Key: "user123"
Value: {"name": "Alice", "age": 30}
Headers: content-type=application/json
# Stored in Redis Stream as:
XADD korvet:stream:my-topic:0 * \
__key "user123" \
__header.content-type "application/json" \
name "\"Alice\"" \
age "30"
| JSON values are flattened - each top-level JSON field becomes a separate stream field. Nested objects and arrays are JSON-encoded as byte arrays. |
Raw Bytes Value Example
# Kafka record with raw bytes
Key: "order456"
Value: <binary data>
Headers: content-type=application/octet-stream
# Stored in Redis Stream as:
XADD korvet:my-topic:0 * \
__key "order456" \
__header.content-type "application/octet-stream" \
value <binary data>
Retention Policies
Korvet enforces retention policies at write time using Redis Stream’s MAXLEN and MINID arguments:
-
retention.ms: Time-based retention (default: 7 days) -
retention.bytes: Size-based retention (default: unlimited) -
compression.type: Compression type for fetch responses (none, gzip, snappy, lz4, zstd)
How Retention Works
When producing messages, Korvet applies retention policies using XADD arguments:
-
Time-based:
MINIDargument trims messages older thanretention.ms -
Size-based:
MAXLENargument limits stream to approximate message count based onretention.bytes
# Example: XADD with retention
XADD korvet:stream:my-topic:0 MAXLEN ~ 1000 MINID 1234567890000-0 * __key "..." value "..."
|
Configuring Retention
Set retention when creating topics via Kafka Admin API:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
AdminClient admin = AdminClient.create(props);
NewTopic topic = new NewTopic("my-topic", 3, (short) 1);
topic.configs(Map.of(
"retention.ms", "86400000", // 1 day
"retention.bytes", "1073741824", // 1 GB
"compression.type", "lz4" // Compress fetch responses with LZ4
));
admin.createTopics(List.of(topic));
Or configure defaults in application.yml:
korvet:
server:
default-retention-time: 7d
default-retention-size: 10GB
default-compression: lz4
Performance Tuning
Async Operations
Korvet uses asynchronous Redis operations for maximum throughput:
-
All Redis commands use Lettuce’s async API (
RedisFuture) -
Operations return
CompletableFutureto avoid blocking -
Multiple operations execute in parallel
-
Netty event loop threads remain non-blocking
Benefits: * Higher throughput with fewer threads * Better resource utilization * Reduced latency under load
Pipelining
Korvet automatically batches Redis operations using Lettuce’s command pipelining:
-
Multiple commands are batched together
-
setAutoFlushCommands(false)delays command execution -
flushCommands()sends all commands in a single network round-trip -
Significantly improves throughput for high-volume producers
Example: Producing 1000 messages sends 1000 XADD commands in a single pipeline instead of 1000 round-trips.
Connection Pooling
Korvet uses Apache Commons Pool for Redis connection management:
korvet:
redis:
pool:
max-active: 10 # Maximum connections
max-idle: 10 # Maximum idle connections
min-idle: 2 # Minimum idle connections
max-wait: -1ms # Wait time when pool exhausted (-1 = indefinite)
time-between-eviction-runs: 60s # Idle connection cleanup interval
Benefits: * Reuses connections across requests * Reduces connection overhead * Configurable pool size based on workload
Compression
Korvet implements server-side compression to reduce network bandwidth between the server and consumers:
Storage: * Messages are always stored uncompressed in Redis Streams * This enables better queryability and avoids decompression overhead in the storage layer * Each record is stored as individual fields in the stream
Producer side: * Kafka clients can send compressed or uncompressed batches * Korvet automatically decompresses incoming batches using Kafka’s API * Individual records are extracted and stored uncompressed in Redis
Consumer side:
* When consumers fetch messages, Korvet compresses the response based on the topic’s compression.type configuration
* Compression is applied per-topic, not per-producer
* Supported types: NONE, GZIP, SNAPPY, LZ4, ZSTD
Benefits: * Reduces network bandwidth for fetch responses * Transparent to clients - works with all Kafka clients * Flexible per-topic configuration * No impact on storage layer performance
See Compression for configuration details.