Cold Storage (Delta Lake)

Korvet includes a built-in archival service that continuously archives messages from Redis to Delta Lake for long-term retention.

Overview

The cold storage archival service:

  1. Reads messages from Redis (hot or cold tier) using consumer groups

  2. Writes them to Delta Lake tables in Parquet format with ZSTD compression

  3. Supports S3, HDFS, or local filesystem storage

  4. Runs as part of the Korvet server—no additional infrastructure required

Configuration

Enable cold storage by setting the delta-path property in your application.yml:

korvet:
  server:
    delta-path: s3a://my-bucket/korvet/delta
    s3:
      region: us-west-1
    delta-archiver:
      consumer-group: korvet-delta-archiver
Cold storage is automatically enabled when delta-path is set. There is no separate enabled property.

Configuration Properties

Property Default Description

korvet.server.delta-path

required

Delta Lake storage path. Supports file://, s3a://, or hdfs:// schemes.

korvet.server.s3.region

required for S3

AWS region (e.g., us-west-1)

korvet.server.s3.endpoint

none

Custom S3 endpoint (for MinIO or LocalStack)

korvet.server.delta-archiver.consumer-group

korvet-delta-archiver

Consumer group name for tracking archival progress

Tuning Properties

Property Default Description

korvet.server.delta-archiver.read-workers

1

Number of parallel readers. Streams are partitioned across workers using consistent hashing.

korvet.server.delta-archiver.batch-size

10000

Messages per Redis XREADGROUP call

korvet.server.delta-archiver.files-per-commit

10

Parquet files per Delta Lake transaction

korvet.server.delta-archiver.block-duration-ms

100

Block duration for XREADGROUP in milliseconds

Per-Topic Retention Configuration

Control when data moves from hot/cold tiers to cold storage using topic-level configuration:

Configuration Default Description

remote.storage.enable

false

Enable tiered storage for this topic (Kafka KIP-405 standard)

local.retention.ms

-2

Time to keep in hot tier before archiving. -2 means use total retention.ms (Kafka KIP-405)

local.retention.bytes

-2

Size to keep in local tier before archiving. -2 means use total retention.bytes (Kafka KIP-405)

retention.ms

604800000 (7 days)

Total retention across all tiers. Data deleted after this time (Kafka standard)

Remote tier retention is implicit: remote_retention = retention.ms - local.retention.ms

Example: Keep 1 day local, ~364 days remote (1 year total):

kafka-configs --bootstrap-server localhost:9092 \
  --entity-type topics --entity-name my-topic --alter \
  --add-config remote.storage.enable=true,retention.ms=31536000000,local.retention.ms=86400000

Storage Format

Delta Lake Table Structure

Messages are stored in Delta Lake tables organized by stream:

s3a://my-bucket/korvet/delta/
└── korvet/stream/orders/0/           # Topic "orders", partition 0
    ├── _delta_log/                   # Delta transaction log
    │   ├── 00000000000000000000.json
    │   └── ...
    ├── part-xxxxx.zstd.parquet       # Data files
    └── ...

Parquet Schema

Each Parquet file contains the following columns:

Column Type Description

stream_key

STRING

Redis stream key (e.g., korvet:stream:orders:0)

message_id

STRING

Redis stream message ID (e.g., 1708956789123-0)

timestamp

LONG

Message timestamp in milliseconds

fields

MAP<STRING, BINARY>

Message fields as key-value pairs

Compression

Data is compressed using ZSTD, achieving approximately 50:1 compression ratio for typical message payloads.

AWS Authentication

The archival service uses AWS SDK v2 (via Hadoop 3.4+) and supports multiple authentication methods.

Credential Types

Type Description

auto (default)

Auto-detect credentials. Tries in order: static credentials in config, IRSA (EKS), IMDS (EC2/ECS), environment variables.

iam

EC2 Instance Metadata Service (IMDS) only. Use on EC2, ECS, or EKS with node IAM roles.

irsa

EKS IAM Roles for Service Accounts. Uses AWS_ROLE_ARN and AWS_WEB_IDENTITY_TOKEN_FILE environment variables.

static

Explicit access key and secret. Use for development, MinIO, or LocalStack.

IRSA (EKS IAM Roles for Service Accounts)

For EKS deployments, IRSA provides pod-level IAM permissions without sharing node-level credentials.

korvet:
  storage:
    remote:
      path: s3a://my-bucket/korvet/delta
      s3:
        region: us-east-1
        credentials:
          type: irsa  # Or use 'auto' (default) which auto-detects IRSA

When IRSA is configured on your EKS cluster, the following environment variables are automatically injected into pods:

  • AWS_ROLE_ARN - The IAM role ARN to assume

  • AWS_WEB_IDENTITY_TOKEN_FILE - Path to the OIDC token file

Korvet will automatically detect these and use WebIdentityTokenCredentialsProvider.

IAM Role (EC2/ECS/EKS Nodes)

On EC2, ECS, or EKS with node-level IAM roles, the service automatically uses instance/task/pod IAM roles via IMDS:

korvet:
  storage:
    remote:
      path: s3a://my-bucket/korvet/delta
      s3:
        region: us-west-1
        credentials:
          type: iam  # Explicit IMDS, or use 'auto' (default)

Environment Variables

Set AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and optionally AWS_SESSION_TOKEN:

export AWS_ACCESS_KEY_ID=AKIA...
export AWS_SECRET_ACCESS_KEY=...
export AWS_REGION=us-west-1

When using the default auto credential type, these environment variables are automatically detected.

Static Credentials

For development or MinIO, configure static credentials:

korvet:
  storage:
    remote:
      path: s3a://my-bucket/korvet/delta
      s3:
        region: us-west-1
        endpoint: http://localhost:9000
        credentials:
          type: static
          access-key-id: minioadmin
          secret-access-key: minioadmin

Troubleshooting S3 Access

Korvet provides a diagnostic endpoint to troubleshoot S3 configuration and connectivity issues.

S3 Diagnostics Endpoint

Access the endpoint at /actuator/s3diagnostics:

curl http://localhost:8080/actuator/s3diagnostics | jq .

Example response:

{
  "timestamp": "2026-03-19T10:30:00Z",
  "configuration": {
    "remoteStorageConfigured": true,
    "path": "s3a://my-bucket/delta",
    "region": "us-east-1",
    "credentialsType": "AUTO",
    "environment": {
      "irsaDetected": true,
      "AWS_ROLE_ARN": "arn:aws:iam::123456789012:role/korvet-s3-role",
      "AWS_WEB_IDENTITY_TOKEN_FILE": "/var/run/secrets/...",
      "tokenFileExists": true,
      "staticCredentialsDetected": false
    }
  },
  "bucket": "my-bucket",
  "prefix": "delta",
  "identity": {
    "status": "OK",
    "account": "123456789012",
    "arn": "arn:aws:sts::123456789012:assumed-role/korvet-s3-role/...",
    "userId": "AROA..."
  },
  "s3Access": {
    "headBucket": {
      "status": "OK",
      "message": "Bucket exists and is accessible"
    },
    "listObjects": {
      "status": "OK",
      "objectCount": 5,
      "message": "ListBucket permission verified"
    }
  },
  "status": "OK"
}

Common Issues

Region not configured

Ensure korvet.storage.remote.s3.region is set, or set AWS_REGION environment variable.

IRSA not detected

Verify that AWS_ROLE_ARN and AWS_WEB_IDENTITY_TOKEN_FILE environment variables are set and the token file exists.

Permission denied (403)

The IAM role/user lacks required S3 permissions. Required permissions:

  • s3:ListBucket on arn:aws:s3:::bucket-name

  • s3:GetObject on arn:aws:s3:::bucket-name/*

  • s3:PutObject on arn:aws:s3:::bucket-name/*

  • s3:DeleteObject on arn:aws:s3:::bucket-name/*

Identity shows wrong role

On EKS, ensure the ServiceAccount is properly annotated with eks.amazonaws.com/role-arn and the pod is using that ServiceAccount.

Performance

The archival service achieves high throughput when archiving to same-region S3:

Configuration Throughput Notes

Single stream

~32,000 msg/s

Baseline performance

4 streams (parallel)

~115,000 msg/s

Near-linear scaling

See Cold Storage Benchmarks for detailed results.

Performance Tips

  • Same-region S3: Deploy Korvet in the same AWS region as your S3 bucket

  • Multiple streams: Use multiple topic partitions for parallel archival

  • Batch size: Larger batches (10,000+ messages) improve throughput

  • Worker count: Match read-worker-count and commit-worker-count to your stream count

Reading Archived Data

Archived data can be read using:

  • Korvet CLI: Export to JSON Lines format (see below)

  • Apache Spark: Direct Delta Lake access

  • Databricks: Native Delta Lake support

  • Presto/Trino: Query Delta Lake tables

  • AWS Athena: Serverless SQL queries

Korvet CLI

The Korvet CLI provides commands for listing and exporting archived data from Delta Lake.

Installation

The CLI is built as part of the korvet-cli module:

./gradlew :korvet-cli:installDist

The CLI is available at korvet-cli/build/install/korvet-cli/bin/korvet-cli, or use the ./korvet wrapper script.

Listing Archived Data (s3 ls)

List messages or get summary statistics from a Delta Lake archive:

# List messages
./korvet s3 ls s3a://my-bucket/korvet/delta/korvet/stream/orders/0/ --region us-west-2

# Show summary only (fast - just metadata)
./korvet s3 ls s3a://my-bucket/korvet/delta/korvet/stream/orders/0/ --region us-west-2 --summarize --max-items 0

# List parquet files without scanning content
./korvet s3 ls s3a://my-bucket/korvet/delta/korvet/stream/orders/0/ --region us-west-2 --files-only
Table 1. s3 ls options
Option Description

--region <region>

AWS region (required for S3)

--summarize

Show summary statistics (total messages, first/last IDs)

--max-items <count>

Maximum messages to list. Use 0 with --summarize for metadata-only query.

--start <id>

Start after this message ID, exclusive. Format: timestamp-seq (e.g., 1774849698423-0) or just timestamp in millis.

--files-only

List parquet files without scanning content (fast)

-l

Long format with size and date

-h

Human-readable sizes

Exporting Archived Data (s3 cp)

Export messages from Delta Lake to JSON Lines format:

# Export to local file (gzipped)
./korvet s3 cp s3a://my-bucket/korvet/delta/korvet/stream/orders/0/ ./orders.jsonl.gz --region us-west-2

# Export to S3
./korvet s3 cp s3a://my-bucket/korvet/delta/korvet/stream/orders/0/ s3://dest-bucket/orders.jsonl.gz --region us-west-2

# Export with limits
./korvet s3 cp s3a://my-bucket/korvet/delta/korvet/stream/orders/0/ ./orders.jsonl.gz --region us-west-2 --max-items 10000

# Export time range
./korvet s3 cp s3a://my-bucket/korvet/delta/korvet/stream/orders/0/ ./orders.jsonl.gz --region us-west-2 \
  --start 1774838105438 --end 1774841104039

# Export all partitions recursively
./korvet s3 cp s3a://my-bucket/korvet/delta/korvet/stream/orders/ ./orders/ --region us-west-2 --recursive
Table 2. s3 cp options
Option Description

--region <region>

AWS region (required for S3)

--start <id>

Start after this message ID, exclusive. Format: timestamp-seq (e.g., 1774849698423-0) or just timestamp in millis.

--end <id>

Stop at this message ID, inclusive. Same format as --start.

--max-items <count>

Maximum messages to export

-r, --recursive

Export all Delta tables under the source prefix

--value-type <type>

Value type: AUTO (default), JSON, RAW. AUTO detects from first message.

--storage-compression <type>

Storage compression for RAW values: AUTO (default), NONE, GZIP, SNAPPY, LZ4, ZSTD. AUTO detects from magic bytes.

--dry-run

Show what would be exported without writing

-s, --silent

Suppress progress output

Value Type Detection

The CLI automatically detects how values were stored:

  • JSON: Message body was flattened into multiple fields. Each field is exported as-is.

  • RAW: Message body was stored as compressed bytes in __value field. The CLI decompresses and exports the original value.

Storage compression (ZSTD, GZIP, LZ4, Snappy) is auto-detected from magic bytes in the stored data.

Output Format

Messages are exported as JSON Lines (one JSON object per line):

{"_id":"1774849698423-0","_ts":1774849698423,"status_code":500,"service":"inventory","path":"/health"}
{"_id":"1774849698423-1","_ts":1774849698423,"status_code":404,"service":"web-app","path":"/metrics"}

Each message includes:

  • _id: Message ID (timestamp-sequence format)

  • _ts: Timestamp in milliseconds

  • Original message fields

Example: Reading with Spark

val df = spark.read.format("delta")
  .load("s3a://my-bucket/korvet/delta/korvet/stream/orders/0")

df.select("message_id", "timestamp", "fields")
  .filter($"timestamp" > 1708956789000L)
  .show()

See Databricks Integration for more examples.