For the latest stable version, please use Korvet 0.12.5!

Cold Storage (Delta Lake)

Korvet includes a built-in archival service that continuously archives messages from Redis to Delta Lake for long-term retention.

Overview

The cold storage archival service:

  1. Reads messages from Redis (hot or cold tier) using consumer groups

  2. Writes them to Delta Lake tables in Parquet format with ZSTD compression

  3. Supports S3, HDFS, or local filesystem storage

  4. Runs as part of the Korvet server—no additional infrastructure required

Configuration

Enable cold storage by setting the delta-path property in your application.yml:

korvet:
  server:
    delta-path: s3a://my-bucket/korvet/delta
    s3:
      region: us-west-1
    delta-archiver:
      consumer-group: korvet-delta-archiver
Cold storage is automatically enabled when delta-path is set. There is no separate enabled property.

Configuration Properties

Property Default Description

korvet.server.delta-path

required

Delta Lake storage path. Supports file://, s3a://, or hdfs:// schemes.

korvet.server.s3.region

required for S3

AWS region (e.g., us-west-1)

korvet.server.s3.endpoint

none

Custom S3 endpoint (for MinIO or LocalStack)

korvet.server.delta-archiver.consumer-group

korvet-delta-archiver

Consumer group name for tracking archival progress

Tuning Properties

Property Default Description

korvet.server.delta-archiver.read-workers

1

Number of parallel readers. Streams are partitioned across workers using consistent hashing.

korvet.server.delta-archiver.batch-size

10000

Messages per Redis XREADGROUP call

korvet.server.delta-archiver.files-per-commit

10

Parquet files per Delta Lake transaction

korvet.server.delta-archiver.block-duration-ms

100

Block duration for XREADGROUP in milliseconds

Per-Topic Retention Configuration

Control when data moves from hot/cold tiers to cold storage using topic-level configuration:

Configuration Default Description

remote.storage.enable

false

Enable tiered storage for this topic (Kafka KIP-405 standard)

local.retention.ms

-2

Time to keep in hot tier before archiving. -2 means use total retention.ms (Kafka KIP-405)

local.retention.bytes

-2

Size to keep in local tier before archiving. -2 means use total retention.bytes (Kafka KIP-405)

retention.ms

604800000 (7 days)

Total retention across all tiers. Data deleted after this time (Kafka standard)

Remote tier retention is implicit: remote_retention = retention.ms - local.retention.ms

Example: Keep 1 day local, ~364 days remote (1 year total):

kafka-configs --bootstrap-server localhost:9092 \
  --entity-type topics --entity-name my-topic --alter \
  --add-config remote.storage.enable=true,retention.ms=31536000000,local.retention.ms=86400000

Storage Format

Delta Lake Table Structure

Messages are stored in Delta Lake tables organized by stream:

s3a://my-bucket/korvet/delta/
└── korvet/stream/orders/0/           # Topic "orders", partition 0
    ├── _delta_log/                   # Delta transaction log
    │   ├── 00000000000000000000.json
    │   └── ...
    ├── part-xxxxx.zstd.parquet       # Data files
    └── ...

Parquet Schema

Each Parquet file contains the following columns:

Column Type Description

stream_key

STRING

Redis stream key (e.g., korvet:stream:orders:0)

message_id

STRING

Redis stream message ID (e.g., 1708956789123-0)

timestamp

LONG

Message timestamp in milliseconds

fields

MAP<STRING, BINARY>

Message fields as key-value pairs

Compression

Data is compressed using ZSTD, achieving approximately 50:1 compression ratio for typical message payloads.

AWS Authentication

The archival service uses AWS SDK v2 (via Hadoop 3.4+) and supports multiple authentication methods.

Credential Types

Type Description

auto (default)

Auto-detect credentials. Tries in order: static credentials in config, IRSA (EKS), IMDS (EC2/ECS), environment variables.

iam

EC2 Instance Metadata Service (IMDS) only. Use on EC2, ECS, or EKS with node IAM roles.

irsa

EKS IAM Roles for Service Accounts. Uses AWS_ROLE_ARN and AWS_WEB_IDENTITY_TOKEN_FILE environment variables.

static

Explicit access key and secret. Use for development, MinIO, or LocalStack.

IRSA (EKS IAM Roles for Service Accounts)

For EKS deployments, IRSA provides pod-level IAM permissions without sharing node-level credentials.

korvet:
  storage:
    remote:
      path: s3a://my-bucket/korvet/delta
      s3:
        region: us-east-1
        credentials:
          type: irsa  # Or use 'auto' (default) which auto-detects IRSA

When IRSA is configured on your EKS cluster, the following environment variables are automatically injected into pods:

  • AWS_ROLE_ARN - The IAM role ARN to assume

  • AWS_WEB_IDENTITY_TOKEN_FILE - Path to the OIDC token file

Korvet will automatically detect these and use WebIdentityTokenCredentialsProvider.

IAM Role (EC2/ECS/EKS Nodes)

On EC2, ECS, or EKS with node-level IAM roles, the service automatically uses instance/task/pod IAM roles via IMDS:

korvet:
  storage:
    remote:
      path: s3a://my-bucket/korvet/delta
      s3:
        region: us-west-1
        credentials:
          type: iam  # Explicit IMDS, or use 'auto' (default)

Environment Variables

Set AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and optionally AWS_SESSION_TOKEN:

export AWS_ACCESS_KEY_ID=AKIA...
export AWS_SECRET_ACCESS_KEY=...
export AWS_REGION=us-west-1

When using the default auto credential type, these environment variables are automatically detected.

Static Credentials

For development or MinIO, configure static credentials:

korvet:
  storage:
    remote:
      path: s3a://my-bucket/korvet/delta
      s3:
        region: us-west-1
        endpoint: http://localhost:9000
        credentials:
          type: static
          access-key-id: minioadmin
          secret-access-key: minioadmin

Troubleshooting S3 Access

Korvet provides a diagnostic endpoint to troubleshoot S3 configuration and connectivity issues.

S3 Diagnostics Endpoint

Access the endpoint at /actuator/s3diagnostics:

curl http://localhost:8080/actuator/s3diagnostics | jq .

Example response:

{
  "timestamp": "2026-03-19T10:30:00Z",
  "configuration": {
    "remoteStorageConfigured": true,
    "path": "s3a://my-bucket/delta",
    "region": "us-east-1",
    "credentialsType": "AUTO",
    "environment": {
      "irsaDetected": true,
      "AWS_ROLE_ARN": "arn:aws:iam::123456789012:role/korvet-s3-role",
      "AWS_WEB_IDENTITY_TOKEN_FILE": "/var/run/secrets/...",
      "tokenFileExists": true,
      "staticCredentialsDetected": false
    }
  },
  "bucket": "my-bucket",
  "prefix": "delta",
  "identity": {
    "status": "OK",
    "account": "123456789012",
    "arn": "arn:aws:sts::123456789012:assumed-role/korvet-s3-role/...",
    "userId": "AROA..."
  },
  "s3Access": {
    "headBucket": {
      "status": "OK",
      "message": "Bucket exists and is accessible"
    },
    "listObjects": {
      "status": "OK",
      "objectCount": 5,
      "message": "ListBucket permission verified"
    }
  },
  "status": "OK"
}

Common Issues

Region not configured

Ensure korvet.storage.remote.s3.region is set, or set AWS_REGION environment variable.

IRSA not detected

Verify that AWS_ROLE_ARN and AWS_WEB_IDENTITY_TOKEN_FILE environment variables are set and the token file exists.

Permission denied (403)

The IAM role/user lacks required S3 permissions. Required permissions:

  • s3:ListBucket on arn:aws:s3:::bucket-name

  • s3:GetObject on arn:aws:s3:::bucket-name/*

  • s3:PutObject on arn:aws:s3:::bucket-name/*

  • s3:DeleteObject on arn:aws:s3:::bucket-name/*

Identity shows wrong role

On EKS, ensure the ServiceAccount is properly annotated with eks.amazonaws.com/role-arn and the pod is using that ServiceAccount.

Performance

The archival service achieves high throughput when archiving to same-region S3:

Configuration Throughput Notes

Single stream

~32,000 msg/s

Baseline performance

4 streams (parallel)

~115,000 msg/s

Near-linear scaling

See Cold Storage Benchmarks for detailed results.

Performance Tips

  • Same-region S3: Deploy Korvet in the same AWS region as your S3 bucket

  • Multiple streams: Use multiple topic partitions for parallel archival

  • Batch size: Larger batches (10,000+ messages) improve throughput

  • Worker count: Match read-worker-count and commit-worker-count to your stream count

Reading Archived Data

Archived data can be read using:

  • Apache Spark: Direct Delta Lake access

  • Databricks: Native Delta Lake support

  • Presto/Trino: Query Delta Lake tables

  • AWS Athena: Serverless SQL queries

Example: Reading with Spark

val df = spark.read.format("delta")
  .load("s3a://my-bucket/korvet/delta/korvet/stream/orders/0")

df.select("message_id", "timestamp", "fields")
  .filter($"timestamp" > 1708956789000L)
  .show()

See Databricks Integration for more examples.