Cold Storage (Delta Lake)
Korvet includes a built-in archival service that continuously archives messages from Redis to Delta Lake for long-term retention.
Overview
The cold storage archival service:
-
Reads messages from Redis (hot or cold tier) using consumer groups
-
Writes them to Delta Lake tables in Parquet format with ZSTD compression
-
Supports S3, HDFS, or local filesystem storage
-
Runs as part of the Korvet server—no additional infrastructure required
Configuration
Enable cold storage by setting the delta-path property in your application.yml:
korvet:
server:
delta-path: s3a://my-bucket/korvet/delta
s3:
region: us-west-1
delta-archiver:
consumer-group: korvet-delta-archiver
Cold storage is automatically enabled when delta-path is set. There is no separate enabled property.
|
Configuration Properties
| Property | Default | Description |
|---|---|---|
|
required |
Delta Lake storage path. Supports |
|
required for S3 |
AWS region (e.g., |
|
none |
Custom S3 endpoint (for MinIO or LocalStack) |
|
|
Consumer group name for tracking archival progress |
Tuning Properties
| Property | Default | Description |
|---|---|---|
|
|
Number of parallel readers. Streams are partitioned across workers using consistent hashing. |
|
|
Messages per Redis XREADGROUP call |
|
|
Parquet files per Delta Lake transaction |
|
|
Block duration for XREADGROUP in milliseconds |
Per-Topic Retention Configuration
Control when data moves from hot/cold tiers to cold storage using topic-level configuration:
| Configuration | Default | Description |
|---|---|---|
|
|
Enable tiered storage for this topic (Kafka KIP-405 standard) |
|
|
Time to keep in hot tier before archiving. |
|
|
Size to keep in local tier before archiving. |
|
|
Total retention across all tiers. Data deleted after this time (Kafka standard) |
Remote tier retention is implicit: remote_retention = retention.ms - local.retention.ms
Example: Keep 1 day local, ~364 days remote (1 year total):
kafka-configs --bootstrap-server localhost:9092 \
--entity-type topics --entity-name my-topic --alter \
--add-config remote.storage.enable=true,retention.ms=31536000000,local.retention.ms=86400000
Storage Format
Delta Lake Table Structure
Messages are stored in Delta Lake tables organized by stream:
s3a://my-bucket/korvet/delta/
└── korvet/stream/orders/0/ # Topic "orders", partition 0
├── _delta_log/ # Delta transaction log
│ ├── 00000000000000000000.json
│ └── ...
├── part-xxxxx.zstd.parquet # Data files
└── ...
Parquet Schema
Each Parquet file contains the following columns:
| Column | Type | Description |
|---|---|---|
|
STRING |
Redis stream key (e.g., |
|
STRING |
Redis stream message ID (e.g., |
|
LONG |
Message timestamp in milliseconds |
|
MAP<STRING, BINARY> |
Message fields as key-value pairs |
AWS Authentication
The archival service uses AWS SDK v2 (via Hadoop 3.4+) and supports multiple authentication methods.
Credential Types
| Type | Description |
|---|---|
|
Auto-detect credentials. Tries in order: static credentials in config, IRSA (EKS), IMDS (EC2/ECS), environment variables. |
|
EC2 Instance Metadata Service (IMDS) only. Use on EC2, ECS, or EKS with node IAM roles. |
|
EKS IAM Roles for Service Accounts. Uses |
|
Explicit access key and secret. Use for development, MinIO, or LocalStack. |
IRSA (EKS IAM Roles for Service Accounts)
For EKS deployments, IRSA provides pod-level IAM permissions without sharing node-level credentials.
korvet:
storage:
remote:
path: s3a://my-bucket/korvet/delta
s3:
region: us-east-1
credentials:
type: irsa # Or use 'auto' (default) which auto-detects IRSA
When IRSA is configured on your EKS cluster, the following environment variables are automatically injected into pods:
-
AWS_ROLE_ARN- The IAM role ARN to assume -
AWS_WEB_IDENTITY_TOKEN_FILE- Path to the OIDC token file
Korvet will automatically detect these and use WebIdentityTokenCredentialsProvider.
IAM Role (EC2/ECS/EKS Nodes)
On EC2, ECS, or EKS with node-level IAM roles, the service automatically uses instance/task/pod IAM roles via IMDS:
korvet:
storage:
remote:
path: s3a://my-bucket/korvet/delta
s3:
region: us-west-1
credentials:
type: iam # Explicit IMDS, or use 'auto' (default)
Troubleshooting S3 Access
Korvet provides a diagnostic endpoint to troubleshoot S3 configuration and connectivity issues.
S3 Diagnostics Endpoint
Access the endpoint at /actuator/s3diagnostics:
curl http://localhost:8080/actuator/s3diagnostics | jq .
Example response:
{
"timestamp": "2026-03-19T10:30:00Z",
"configuration": {
"remoteStorageConfigured": true,
"path": "s3a://my-bucket/delta",
"region": "us-east-1",
"credentialsType": "AUTO",
"environment": {
"irsaDetected": true,
"AWS_ROLE_ARN": "arn:aws:iam::123456789012:role/korvet-s3-role",
"AWS_WEB_IDENTITY_TOKEN_FILE": "/var/run/secrets/...",
"tokenFileExists": true,
"staticCredentialsDetected": false
}
},
"bucket": "my-bucket",
"prefix": "delta",
"identity": {
"status": "OK",
"account": "123456789012",
"arn": "arn:aws:sts::123456789012:assumed-role/korvet-s3-role/...",
"userId": "AROA..."
},
"s3Access": {
"headBucket": {
"status": "OK",
"message": "Bucket exists and is accessible"
},
"listObjects": {
"status": "OK",
"objectCount": 5,
"message": "ListBucket permission verified"
}
},
"status": "OK"
}
Common Issues
- Region not configured
-
Ensure
korvet.storage.remote.s3.regionis set, or setAWS_REGIONenvironment variable. - IRSA not detected
-
Verify that
AWS_ROLE_ARNandAWS_WEB_IDENTITY_TOKEN_FILEenvironment variables are set and the token file exists. - Permission denied (403)
-
The IAM role/user lacks required S3 permissions. Required permissions:
-
s3:ListBucketonarn:aws:s3:::bucket-name -
s3:GetObjectonarn:aws:s3:::bucket-name/* -
s3:PutObjectonarn:aws:s3:::bucket-name/* -
s3:DeleteObjectonarn:aws:s3:::bucket-name/*
-
- Identity shows wrong role
-
On EKS, ensure the ServiceAccount is properly annotated with
eks.amazonaws.com/role-arnand the pod is using that ServiceAccount.
Performance
The archival service achieves high throughput when archiving to same-region S3:
| Configuration | Throughput | Notes |
|---|---|---|
Single stream |
~32,000 msg/s |
Baseline performance |
4 streams (parallel) |
~115,000 msg/s |
Near-linear scaling |
See Cold Storage Benchmarks for detailed results.
Performance Tips
-
Same-region S3: Deploy Korvet in the same AWS region as your S3 bucket
-
Multiple streams: Use multiple topic partitions for parallel archival
-
Batch size: Larger batches (10,000+ messages) improve throughput
-
Worker count: Match
read-worker-countandcommit-worker-countto your stream count
Reading Archived Data
Archived data can be read using:
-
Korvet CLI: Export to JSON Lines format (see below)
-
Apache Spark: Direct Delta Lake access
-
Databricks: Native Delta Lake support
-
Presto/Trino: Query Delta Lake tables
-
AWS Athena: Serverless SQL queries
Korvet CLI
The Korvet CLI provides commands for listing and exporting archived data from Delta Lake.
Installation
The CLI is built as part of the korvet-cli module:
./gradlew :korvet-cli:installDist
The CLI is available at korvet-cli/build/install/korvet-cli/bin/korvet-cli, or use the ./korvet wrapper script.
Listing Archived Data (s3 ls)
List messages or get summary statistics from a Delta Lake archive:
# List messages
./korvet s3 ls s3a://my-bucket/korvet/delta/korvet/stream/orders/0/ --region us-west-2
# Show summary only (fast - just metadata)
./korvet s3 ls s3a://my-bucket/korvet/delta/korvet/stream/orders/0/ --region us-west-2 --summarize --max-items 0
# List parquet files without scanning content
./korvet s3 ls s3a://my-bucket/korvet/delta/korvet/stream/orders/0/ --region us-west-2 --files-only
| Option | Description |
|---|---|
|
AWS region (required for S3) |
|
Show summary statistics (total messages, first/last IDs) |
|
Maximum messages to list. Use |
|
Start after this message ID, exclusive. Format: |
|
List parquet files without scanning content (fast) |
|
Long format with size and date |
|
Human-readable sizes |
Exporting Archived Data (s3 cp)
Export messages from Delta Lake to JSON Lines format:
# Export to local file (gzipped)
./korvet s3 cp s3a://my-bucket/korvet/delta/korvet/stream/orders/0/ ./orders.jsonl.gz --region us-west-2
# Export to S3
./korvet s3 cp s3a://my-bucket/korvet/delta/korvet/stream/orders/0/ s3://dest-bucket/orders.jsonl.gz --region us-west-2
# Export with limits
./korvet s3 cp s3a://my-bucket/korvet/delta/korvet/stream/orders/0/ ./orders.jsonl.gz --region us-west-2 --max-items 10000
# Export time range
./korvet s3 cp s3a://my-bucket/korvet/delta/korvet/stream/orders/0/ ./orders.jsonl.gz --region us-west-2 \
--start 1774838105438 --end 1774841104039
# Export all partitions recursively
./korvet s3 cp s3a://my-bucket/korvet/delta/korvet/stream/orders/ ./orders/ --region us-west-2 --recursive
| Option | Description |
|---|---|
|
AWS region (required for S3) |
|
Start after this message ID, exclusive. Format: |
|
Stop at this message ID, inclusive. Same format as |
|
Maximum messages to export |
|
Export all Delta tables under the source prefix |
|
Value type: |
|
Storage compression for RAW values: |
|
Show what would be exported without writing |
|
Suppress progress output |
Value Type Detection
The CLI automatically detects how values were stored:
-
JSON: Message body was flattened into multiple fields. Each field is exported as-is.
-
RAW: Message body was stored as compressed bytes in
__valuefield. The CLI decompresses and exports the original value.
Storage compression (ZSTD, GZIP, LZ4, Snappy) is auto-detected from magic bytes in the stored data.
Output Format
Messages are exported as JSON Lines (one JSON object per line):
{"_id":"1774849698423-0","_ts":1774849698423,"status_code":500,"service":"inventory","path":"/health"}
{"_id":"1774849698423-1","_ts":1774849698423,"status_code":404,"service":"web-app","path":"/metrics"}
Each message includes:
-
_id: Message ID (timestamp-sequence format) -
_ts: Timestamp in milliseconds -
Original message fields
Example: Reading with Spark
val df = spark.read.format("delta")
.load("s3a://my-bucket/korvet/delta/korvet/stream/orders/0")
df.select("message_id", "timestamp", "fields")
.filter($"timestamp" > 1708956789000L)
.show()
See Databricks Integration for more examples.