Redis Streams (Hot Storage)
Korvet uses Redis Streams as the hot storage tier for recent messages.
Why Redis Streams?
-
Low latency: Sub-millisecond read/write performance
-
Consumer groups: Built-in support for coordinated consumption
-
Persistence: AOF and RDB for durability
-
Scalability: Handle millions of messages per second
Stream Structure
Each topic partition maps to a Redis Stream:
-
Stream key:
{keyspace}:{topic}:{partition}(default tenant:default) -
Message ID: Redis-generated timestamp-based ID (format:
timestamp-sequence) -
Fields: Kafka record components mapped to stream fields
Message Format
Kafka records are decomposed into Redis Stream fields:
-
Key: Stored in
__keyfield (if present) -
Headers: Stored as
__header.{name}fields (e.g.,__header.content-type) -
Value: JSON values are flattened - each top-level JSON field becomes a stream field
# Kafka record with JSON value
Key: "user123"
Value: {"name": "Alice", "age": 30}
Headers: content-type=application/json
# Stored in Redis Stream as:
XADD default:my-topic:0 * \
__key "user123" \
__header.content-type "application/json" \
name "\"Alice\"" \
age "30"
All values are JSON-encoded as byte arrays. The value {"name": "Alice", "age": 30} is flattened into separate fields name and age, each JSON-encoded.
|
Retention Policies
|
Retention policy enforcement is currently in development. Topic configurations can be set via Kafka’s CreateTopics API, but they are not yet enforced on Redis Streams. |
Topic retention settings are stored but not yet enforced:
-
retention.ms: Time-based retention (default: 7 days) - not enforced -
retention.bytes: Size-based retention (default: unlimited) - not enforced -
compression.type: Compression type metadata (none, gzip, snappy, lz4, zstd) - stored only
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
AdminClient admin = AdminClient.create(props);
NewTopic topic = new NewTopic("my-topic", 3, (short) 1);
topic.configs(Map.of(
"retention.ms", "86400000", // 1 day (stored but not enforced)
"retention.bytes", "1073741824", // 1 GB (stored but not enforced)
"compression.type", "lz4" // Metadata only
));
admin.createTopics(List.of(topic));
To manually manage Redis Stream retention, use Redis’s XTRIM command:
# Trim by count (keep last 1000 messages)
XTRIM default:my-topic:0 MAXLEN ~ 1000
# Trim by age (keep messages from last hour)
XTRIM default:my-topic:0 MINID <timestamp-in-ms>-0
Performance Tuning
Pipelining
Korvet automatically batches XADD operations using Lettuce’s command pipelining:
-
Multiple produce requests are batched together
-
setAutoFlushCommands(false)delays command execution -
flushCommands()sends all commands in a single network round-trip -
Significantly improves throughput for high-volume producers
Connection Pooling
Korvet uses Apache Commons Pool for Redis connection management:
korvet:
redis:
pool:
max-active: 10 # Maximum connections
max-idle: 10 # Maximum idle connections
min-idle: 2 # Minimum idle connections
max-wait: -1ms # Wait time when pool exhausted (-1 = indefinite)
time-between-eviction-runs: 60s # Idle connection cleanup interval
Benefits: * Reuses connections across requests * Reduces connection overhead * Configurable pool size based on workload
Compression
|
Message compression is not currently implemented in Korvet. Messages are stored uncompressed in Redis Streams. |
The compression.type topic configuration is stored as metadata but not used:
-
Kafka clients can compress messages before sending (client-side compression)
-
Korvet stores the compressed bytes as-is in Redis
-
Consumers receive the same compressed bytes and decompress client-side
-
No server-side compression/decompression is performed