This version is still in development and is not considered stable yet. For the latest stable version, please use Korvet 0.12.5!

Cold Storage (Delta Lake)

Korvet includes a built-in archival service that continuously archives Redis stream data to Delta Lake for long-term retention.

Overview

The cold storage archival service:

  1. Reads from Redis streams using consumer groups

  2. Writes them to Delta Lake tables in Parquet format with ZSTD compression

  3. Supports S3, HDFS, or local filesystem storage

  4. Runs as part of the Korvet server with no separate archiver process

When Redis Flex is in use, Redis itself handles hot/warm placement in RAM and Flash. Korvet’s archival service moves archived stream data from Redis to the Delta Lake remote tier.

Configuration

Enable cold storage by setting korvet.storage.remote.path in your application.yml:

korvet:
  storage:
    remote:
      path: s3a://my-bucket/korvet/delta
      s3:
        region: us-west-1
  archiver:
    scan-interval: 5m
    batch-size: 10000
    max-unarchived-age: 5m
    max-concurrent-streams: 1
Remote storage is enabled automatically when korvet.storage.remote.path is set. Topics are archived only when they also have remote.storage.enable=true.

Configuration Properties

Property Default Description

korvet.storage.remote.path

required

Cold storage path. Supports file://, s3a://, or hdfs:// schemes.

korvet.storage.remote.s3.region

required for S3

AWS region (for example, us-west-1)

korvet.storage.remote.s3.endpoint

none

Custom S3 endpoint for MinIO or LocalStack

korvet.storage.remote.s3.credentials.type

auto

Credential strategy: auto, iam, irsa, or static

Tuning Properties

Property Default Description

korvet.archiver.scan-interval

5s

How often the archiver planner re-checks Redis streams for eligible data

korvet.archiver.batch-size

100000

Maximum records written per archive write

korvet.archiver.max-unarchived-age

5m

Archive a stream once its oldest unarchived message reaches this threshold

korvet.storage.remote.writer.messages-per-file

100000

Maximum records written into a single Parquet file before rolling over

korvet.storage.remote.writer.target-file-size-bytes

134217728 (128 MB)

Target payload budget for each Parquet file so default archival writes favor healthier file sizes

korvet.archiver.max-concurrent-streams

1

Number of concurrent stream archival attempts

Per-Topic Retention Configuration

Control when data moves from local Redis storage to Delta Lake using topic-level configuration:

Configuration Default Description

remote.storage.enable

false

Enable tiered storage for this topic (Kafka KIP-405 standard)

local.retention.ms

-2

Time to keep in the local tier before Redis data expires. -2 means use total retention.ms (Kafka KIP-405)

local.retention.bytes

-2

Size to keep in the local tier before Redis trimming falls back to retention.bytes. -2 means use total retention.bytes (Kafka KIP-405)

retention.ms

604800000 (7 days)

Total retention across all tiers. Data is deleted after this time (Kafka standard)

Remote-tier retention is implicit: retention.ms - local.retention.ms.

Example: Keep 1 day local and the rest remote (1 year total):

kafka-configs --bootstrap-server localhost:9092 \
  --entity-type topics --entity-name my-topic --alter \
  --add-config remote.storage.enable=true,retention.ms=31536000000,local.retention.ms=86400000

Storage Format

Delta Lake Table Structure

Messages are stored in Delta Lake tables organized by stream:

s3a://my-bucket/korvet/delta/
└── korvet/stream/orders/0/           # Topic "orders", partition 0
    ├── _delta_log/                   # Delta transaction log
    │   ├── 00000000000000000000.json
    │   └── ...
    ├── part-xxxxx.zstd.parquet       # Data files
    └── ...

Parquet Schema

Each Parquet file contains the following columns:

Column Type Description

stream

STRING

Redis stream key (for example, korvet:stream:orders:0)

ts

LONG

Message timestamp extracted from the Redis message ID

seq

LONG

Message sequence extracted from the Redis message ID

body

MAP<STRING, BINARY>

Message fields as key-value pairs

Compression

Data files are written with ZSTD Parquet compression.

AWS Authentication

The archival service uses Hadoop S3A configuration and supports multiple authentication methods.

Credential Types

Type Description

auto (default)

Auto-detect credentials. Tries configured static credentials first, then environment or platform-provided credentials.

iam

Use instance metadata credentials, typically for EC2, ECS, or node-level EKS IAM roles.

irsa

Use EKS IAM Roles for Service Accounts via AWS_ROLE_ARN and AWS_WEB_IDENTITY_TOKEN_FILE.

static

Use explicit access key and secret. Useful for development, MinIO, or LocalStack.

IRSA (EKS IAM Roles for Service Accounts)

For EKS deployments, IRSA provides pod-level IAM permissions without sharing node-level credentials.

korvet:
  storage:
    remote:
      path: s3a://my-bucket/korvet/delta
      s3:
        region: us-east-1
        credentials:
          type: irsa

When IRSA is configured on your EKS cluster, the following environment variables are injected into pods:

  • AWS_ROLE_ARN - The IAM role ARN to assume

  • AWS_WEB_IDENTITY_TOKEN_FILE - Path to the OIDC token file

IAM Role (EC2/ECS/EKS Nodes)

On EC2, ECS, or EKS with node-level IAM roles, the service can use instance or task credentials via IMDS:

korvet:
  storage:
    remote:
      path: s3a://my-bucket/korvet/delta
      s3:
        region: us-west-1
        credentials:
          type: iam

Environment Variables

Set AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and optionally AWS_SESSION_TOKEN:

export AWS_ACCESS_KEY_ID=AKIA...
export AWS_SECRET_ACCESS_KEY=...
export AWS_REGION=us-west-1

When using the default auto credential type, these environment variables are detected automatically.

Static Credentials

For development or MinIO, configure static credentials:

korvet:
  storage:
    remote:
      path: s3a://my-bucket/korvet/delta
      s3:
        region: us-west-1
        endpoint: http://localhost:9000
        credentials:
          type: static
          access-key-id: minioadmin
          secret-access-key: minioadmin

Troubleshooting S3 Access

Korvet provides a diagnostic endpoint to troubleshoot S3 configuration and connectivity issues.

S3 Diagnostics Endpoint

Access the endpoint at /actuator/s3diagnostics:

curl http://localhost:8080/actuator/s3diagnostics | jq .

Common Issues

Region not configured

Ensure korvet.storage.remote.s3.region is set, or set AWS_REGION in the environment.

IRSA not detected

Verify that AWS_ROLE_ARN and AWS_WEB_IDENTITY_TOKEN_FILE are present and the token file exists.

Permission denied (403)

The IAM role or user lacks required S3 permissions. Typical permissions are:

  • s3:ListBucket on arn:aws:s3:::bucket-name

  • s3:GetObject on arn:aws:s3:::bucket-name/*

  • s3:PutObject on arn:aws:s3:::bucket-name/*

  • s3:DeleteObject on arn:aws:s3:::bucket-name/*

Identity shows wrong role

On EKS, ensure the ServiceAccount is annotated with eks.amazonaws.com/role-arn and the pod is using that ServiceAccount.

Performance

The archival service achieves high throughput when archiving to same-region S3:

Configuration Throughput Notes

Single stream

~32,000 msg/s

Baseline performance

4 streams (parallel)

~115,000 msg/s

Near-linear scaling

See Cold Storage Benchmarks for detailed results.

Performance Tips

  • Same-region S3: Deploy Korvet in the same AWS region as your S3 bucket

  • Multiple streams: Use multiple topic partitions for parallel archival

  • Batch size: Larger archive batches can improve throughput

  • Archiver concurrency: Increase korvet.archiver.max-concurrent-streams to match your stream count

Reading Archived Data

Archived data can be read using:

  • Apache Spark: Direct Delta Lake access

  • Databricks: Native Delta Lake support

  • Presto/Trino: Query Delta Lake tables

  • AWS Athena: Serverless SQL queries

Example: Reading with Spark

val df = spark.read.format("delta")
  .load("s3a://my-bucket/korvet/delta/korvet/stream/orders/0")

df.select("stream", "ts", "seq", "body")
  .filter($"ts" > 1708956789000L)
  .show()

See Databricks Integration for more examples.