For the latest stable version, please use Korvet 0.12.5!

Cold Storage (Delta Lake)

Korvet includes a built-in archival service that continuously archives messages from Redis to Delta Lake for long-term retention.

Overview

The cold storage archival service:

  1. Reads messages from Redis (hot or warm tier) using consumer groups

  2. Writes them to Delta Lake tables in Parquet format with ZSTD compression

  3. Supports S3, HDFS, or local filesystem storage

  4. Runs as part of the Korvet server—no additional infrastructure required

When a warm tier is configured, the cold archiver reads from the warm tier instead of the hot tier. This enables a 3-tier pipeline: hot (RAM) → warm (disk-backed Redis) → cold (Delta Lake).

Configuration

Enable cold storage by setting the path property in your application.yml:

korvet:
  server:
    storage:
      path: s3a://my-bucket/korvet/delta
      s3:
        region: us-west-1
      archiver:
        consumer-group: korvet-archiver
Cold storage is automatically enabled when path is set. There is no separate enabled property.

Configuration Properties

Property Default Description

korvet.server.storage.path

required

Delta Lake storage path. Supports file://, s3a://, or hdfs:// schemes.

korvet.server.storage.s3.region

required for S3

AWS region (e.g., us-west-1)

korvet.server.storage.s3.endpoint

none

Custom S3 endpoint (for MinIO or LocalStack)

korvet.server.storage.archiver.consumer-group

korvet-archiver

Consumer group name for tracking archival progress

Tuning Properties

Property Default Description

korvet.server.storage.archiver.read-workers

1

Number of parallel readers (set to number of streams/partitions)

korvet.server.storage.archiver.commit-workers

4

Number of parallel writers to S3

korvet.server.storage.archiver.batch-size

10000

Messages per Redis XREADGROUP call

korvet.server.storage.archiver.max-batches-per-commit

10

Batches to accumulate before writing a Parquet file

korvet.server.storage.archiver.files-per-commit

10

Parquet files per Delta Lake transaction

korvet.server.storage.archiver.block-duration-ms

100

Block duration for XREADGROUP in milliseconds

korvet.server.storage.archiver.stream-refresh-interval-ms

30000

Interval to refresh stream list in milliseconds

Per-Topic Retention Configuration

Control when data moves from hot/warm tiers to cold storage using topic-level configuration:

Configuration Default Description

remote.storage.enable

false

Enable tiered storage for this topic (Kafka KIP-405 standard)

local.retention.ms

-2

Time to keep in hot tier before archiving. -2 means use total retention.ms (Kafka KIP-405)

local.retention.bytes

-2

Size to keep in hot tier before archiving. -2 means use total retention.bytes (Kafka KIP-405)

near.retention.ms

-2

Time to keep in warm tier before archiving to cold. -2 means skip warm tier (Korvet extension)

near.retention.bytes

-2

Size to keep in warm tier before archiving. -2 means skip warm tier (Korvet extension)

retention.ms

604800000 (7 days)

Total retention across all tiers. Data deleted after this time (Kafka standard)

Cold tier retention is implicit: cold_retention = retention.ms - local.retention.ms - near.retention.ms

Example: Keep 1 day hot, 7 days warm, ~357 days cold (1 year total):

kafka-configs --bootstrap-server localhost:9092 \
  --entity-type topics --entity-name my-topic --alter \
  --add-config remote.storage.enable=true,retention.ms=31536000000,local.retention.ms=86400000,near.retention.ms=604800000

Storage Format

Delta Lake Table Structure

Messages are stored in Delta Lake tables organized by stream:

s3a://my-bucket/korvet/delta/
└── korvet/stream/orders/0/           # Topic "orders", partition 0
    ├── _delta_log/                   # Delta transaction log
    │   ├── 00000000000000000000.json
    │   └── ...
    ├── part-xxxxx.zstd.parquet       # Data files
    └── ...

Parquet Schema

Each Parquet file contains the following columns:

Column Type Description

stream_key

STRING

Redis stream key (e.g., korvet:stream:orders:0)

message_id

STRING

Redis stream message ID (e.g., 1708956789123-0)

timestamp

LONG

Message timestamp in milliseconds

fields

MAP<STRING, BINARY>

Message fields as key-value pairs

Compression

Data is compressed using ZSTD, achieving approximately 50:1 compression ratio for typical message payloads.

AWS Authentication

The archival service uses AWS SDK v2 (via Hadoop 3.4+) and supports multiple authentication methods:

On EC2, ECS, or EKS, the service automatically uses instance/task/pod IAM roles via the default credential chain. No additional configuration is required.

Korvet uses IAMInstanceCredentialsProvider from Hadoop’s S3A connector, which leverages AWS SDK v2 for optimal performance and security.

Environment Variables

Set AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and optionally AWS_SESSION_TOKEN:

export AWS_ACCESS_KEY_ID=AKIA...
export AWS_SECRET_ACCESS_KEY=...
export AWS_REGION=us-west-1

Static Credentials

For development or MinIO, configure static credentials:

korvet:
  server:
    storage:
      path: s3a://my-bucket/korvet/delta
      s3:
        region: us-west-1
        endpoint: http://localhost:9000
        credentials:
          type: static
          access-key-id: minioadmin
          secret-access-key: minioadmin

Performance

The archival service achieves high throughput when archiving to same-region S3:

Configuration Throughput Notes

Single stream

~32,000 msg/s

Baseline performance

4 streams (parallel)

~115,000 msg/s

Near-linear scaling

See Cold Storage Benchmarks for detailed results.

Performance Tips

  • Same-region S3: Deploy Korvet in the same AWS region as your S3 bucket

  • Multiple streams: Use multiple topic partitions for parallel archival

  • Batch size: Larger batches (10,000+ messages) improve throughput

  • Worker count: Match read-worker-count and commit-worker-count to your stream count

Reading Archived Data

Archived data can be read using:

  • Apache Spark: Direct Delta Lake access

  • Databricks: Native Delta Lake support

  • Presto/Trino: Query Delta Lake tables

  • AWS Athena: Serverless SQL queries

Example: Reading with Spark

val df = spark.read.format("delta")
  .load("s3a://my-bucket/korvet/delta/korvet/stream/orders/0")

df.select("message_id", "timestamp", "fields")
  .filter($"timestamp" > 1708956789000L)
  .show()

See Databricks Integration for more examples.