Configuration

You can configure read and write operations in both batch and streaming modes. Note that Redis streams only support streaming mode.

Specify Configuration

Using SparkConf

You can specify configuration options with SparkConf using any of the following approaches:

Using an Options Map

In the Spark API, the DataFrameReader, DataFrameWriter, DataStreamReader, and DataStreamWriter classes each contain an option() method. You can use this method to specify options for the underlying read or write operation.

Options specified in this way override any corresponding settings in SparkConf.

Short-Form Syntax

Options maps support short-form syntax. You may omit the prefix when specifying an option key string.

For example, the following syntaxes are equivalent to one another:

  • df.option("spark.redis.write.type", "hash").save()

  • df.option("spark.redis.type", "hash").save()

  • df.option("type", "hash").save()

Configuration Exceptions

If the Spark Connector throws a ConfigException, confirm that your SparkConf or options map uses correct syntax and contains only valid configuration options.

Connection Options

The following options apply to both read and write operations.

redis.<read|write>.connection.uri

Redis URI in the form redis://username:password@host:port. For secure (TLS) connections use rediss://.

redis.<read|write>.connection.cluster

Set to true when connecting to a Redis Cluster.

TLS Connection Options

For secure (TLS) connections use rediss:// as the Redis URI scheme.

redis.<read|write>.connection.ssl.cacert

Certificate file to load trusted certificates. The file must provide X.509 certificates in PEM format.

redis.<read|write>.connection.ssl.cert

X.509 certificate chain in PEM format to use for client authentication.

redis.<read|write>.connection.ssl.key

PKCS#8 private key in PEM format to use for client authentication.

redis.<read|write>.connection.ssl.key.password

Password for the private key if it’s password-protected.

Read Options

spark.redis.read.type

Type of reader to use for reading data from Redis (KEYS or STREAM)

spark.redis.read.schema

Specifies known fields to use when inferring the schema from Redis, in the form <field1> <type>, <field2> <type> where type is one of STRING TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE DATE TIMESTAMP.

Redis Keys

With read type KEYS the connector iterates over keys using the Redis SCAN command and then fetches the corresponding values.

spark.redis.read.keyPattern

Read keys matching the given glob-style pattern (default: *).

spark.redis.read.keyType

Read keys matching the given type e.g. string, hash, json (default: all types).

spark.redis.read.threads

Number of reader threads to use in parallel (default: 1).

spark.redis.read.batch

Number of keys each thread fetches the values for at a time in a pipeline call.

spark.redis.read.pool

Maximum number of Redis connections to use across threads (default: 8).

spark.redis.read.scanCount

Number of keys to read at once on each scan call.

spark.redis.read.queueCapacity

Max number of values that the reader threads can queue up (default: 10000).

spark.redis.read.readFrom

Which Redis cluster nodes to read from. See Lettuce ReadFrom.

Streaming

When using Spark streaming the Redis Connector for Spark supports both micro-batch processing and continuous processing.

In this mode the connector reads a change stream from Redis using keyspace notifications in addition to the scan described previously.

spark.redis.read.eventQueueCapacity

Capacity of the keyspace notification queue (default: 10000).

spark.redis.read.idleTimeout

Min duration in milliseconds to consider reader complete.

spark.redis.read.flushInterval

Max duration in milliseconds between flushes.

Redis Stream

Use read type STREAM to read messages from a Redis stream.

The connector uses Redis Consumer Groups for guaranteed message delivery and load balancing across Spark partitions or workers.

This provides streaming semantics with at-least-once delivery guarantees for both micro-batch and continuous processing.

spark.redis.read.streamKey

Key of the Redis stream to read from.

spark.redis.read.consumerGroup

Name of the Redis consumer group (default: spark-redis-group).

spark.redis.read.consumerName

Optional explicit consumer name. If not specified, uses partition-based naming: spark-redis-{partitionId}.

spark.redis.read.streamOffset

Initial stream position for the consumer group. Used when creating the consumer group (default: 0).

spark.redis.read.streamBlock

Block timeout in milliseconds for XREADGROUP operations (default: 100).

spark.redis.read.streamCount

Maximum number of messages to fetch in each XREADGROUP call (default: 50).

spark.redis.read.numPartitions

Number of Spark partitions to create for parallel processing. Each partition becomes a separate consumer in the Redis consumer group (default: 1).

spark.redis.read.ackBatchSize

Number of messages to acknowledge in a single batch operation for better throughput (default: 50). Higher values improve throughput but may increase latency before messages are acknowledged.

spark.redis.read.streamTimestamp

When set to true, extracts the timestamp portion from Redis stream message IDs and adds it as a field to the DataFrame (default: false).

spark.redis.read.streamTimestampField

Name of the field to store the extracted timestamp when streamTimestamp is enabled (default: timestamp).

Consumer Group Behavior

When using consumer group mode:

  • Guaranteed Delivery: Each message is delivered to exactly one consumer in the group

  • Load Balancing: Redis automatically distributes messages across available consumers

  • Fault Tolerance: Unacknowledged messages can be reclaimed by other consumers

  • Consumer Naming: Each Spark partition creates a unique consumer with name spark-redis-{partitionId}

  • Automatic Group Creation: The consumer group is created automatically if it doesn’t exist

  • Message Acknowledgment: Messages are acknowledged after successful processing using XACK

Partition Management

The Redis connector supports configurable partitioning for parallel stream processing:

  • Single Partition (default): numPartitions=1 creates one consumer per streaming query

  • Multiple Partitions: numPartitions=N creates N consumers in the same consumer group for parallel processing

  • Load Balancing: Redis automatically distributes messages across multiple consumers

  • Parallel Execution: Each partition runs on a separate Spark executor for true parallelism

To scale processing:

  1. Increase partitions: Set numPartitions to match your available executor cores

  2. Balance throughput vs latency: More partitions = higher throughput but potential higher latency due to coordination

  3. Monitor consumer lag: Ensure all consumers are processing messages evenly

Write Options

The Redis Connector for Spark supports writing data from Apache Spark DataFrames to Redis using multiple data types and write modes.

spark.redis.write.pool

Maximum number of Redis connections to use across threads (default: 8).

spark.redis.write.rate

Rate limit for write operations in operations per second. When set to a value greater than 0, enables throttling to limit the number of write operations per second (default: 0 - no rate limiting).

Write Data Types

The connector supports writing to four Redis data types:

spark.redis.write.type

Redis data-structure type to write to: hash, json, string, stream (default: json).

JSON Type (Default)

Uses the Redis JSON.SET command to store entire rows as JSON documents. This is the recommended type for complex nested data structures.

dataFrame.write
  .format("redis")
  .option("type", "json")
  .option("key", "id")
  .option("keyspace", "users")
  .save()

HASH Type

Uses the Redis HSET command where each DataFrame field becomes a hash field. Ideal for flat records with efficient field-level access.

dataFrame.write
  .format("redis")
  .option("type", "hash")
  .option("key", "id")
  .option("keyspace", "users")
  .save()

STRING Type

Uses the Redis SET command to store rows as JSON-serialized strings. Suitable for simple key-value storage.

dataFrame.write
  .format("redis")
  .option("type", "string")
  .option("key", "id")
  .option("keyspace", "cache")
  .save()

STREAM Type

Uses the Redis XADD command to append each row as a stream entry. Perfect for event streaming and time-series data.

dataFrame.write
  .format("redis")
  .option("type", "stream")
  .option("keyspace", "events")
  .save()

Key Configuration

spark.redis.write.keyspace

Prefix for keys written to Redis (default: spark).

spark.redis.write.key

Field or list of fields used to compose keys written to Redis (default: auto-generated). Separate with a comma to specify more than one field, e.g. field1,field2.

spark.redis.write.keySeparator

Separator character for keys (default: :).

For types other than stream you should specify a key field otherwise all writes will go to the same single key equal to keyspace.

Key Generation Examples

Single Field Key
.option("keyspace", "users")
.option("key", "id")
// Generated key: users:123
Multiple Field Key
.option("keyspace", "orders")
.option("key", "year,month,id")
// Generated key: orders:2023:12:12345

Save Modes

The connector supports two primary save modes:

Append Mode (Default)

Adds new data to Redis without clearing existing data. This is the default behavior.

dataFrame.write
  .format("redis")
  .mode("append")  // Default mode
  .save()

Overwrite Mode

Clears the entire keyspace before writing new data. Use with caution as this will delete all matching keys.

dataFrame.write
  .format("redis")
  .mode("overwrite")  // Truncates keyspace first
  .save()

Batch Mode

spark.redis.write.batch

Maximum number of keys to write to Redis in a pipeline (default: 50).

spark.redis.write.scanCount

Number of keys to scan per iteration for keyspace operations (default: 100).

Batch mode is used for standard DataFrame write operations:

dataFrame.write
  .format("redis")
  .option("batch", "100")  // Increase batch size
  .option("type", "hash")
  .option("key", "id")
  .save()

Streaming Mode

The connector supports both micro-batch and continuous processing modes for streaming writes.

For streaming reads, the connector uses Redis Consumer Groups:

Streaming Read Examples

Basic Usage (Single Partition)
# Read using XREADGROUP for guaranteed delivery (default mode)
streamDF = spark.readStream \
  .format("redis") \
  .option("type", "stream") \
  .option("streamKey", "events") \
  .option("consumerGroup", "spark-analytics") \
  .option("streamBlock", "1000") \
  .load()
// Read using XREADGROUP for guaranteed delivery (default mode)
val streamDF = spark.readStream
  .format("redis")
  .option("type", "stream")
  .option("streamKey", "events")
  .option("consumerGroup", "spark-analytics")
  .option("streamBlock", "1000")
  .load()
Parallel Processing (Multiple Partitions)
# Use multiple partitions for parallel processing
streamDF = spark.readStream \
  .format("redis") \
  .option("type", "stream") \
  .option("streamKey", "events") \
  .option("consumerGroup", "spark-analytics") \
  .option("numPartitions", "4") \
  .option("streamBlock", "1000") \
  .load()
// Use multiple partitions for parallel processing
val streamDF = spark.readStream
  .format("redis")
  .option("type", "stream")
  .option("streamKey", "events")
  .option("consumerGroup", "spark-analytics")
  .option("numPartitions", "4")
  .option("streamBlock", "1000")
  .load()
Stream Timestamp Extraction
# Extract timestamps from Redis stream message IDs
streamDF = spark.readStream \
  .format("redis") \
  .option("type", "stream") \
  .option("streamKey", "events") \
  .option("consumerGroup", "spark-analytics") \
  .option("streamTimestamp", "true") \
  .option("streamTimestampField", "event_time") \
  .load()

# The resulting DataFrame will include an 'event_time' field
# containing the timestamp from the Redis stream message ID
// Extract timestamps from Redis stream message IDs
val streamDF = spark.readStream
  .format("redis")
  .option("type", "stream")
  .option("streamKey", "events")
  .option("consumerGroup", "spark-analytics")
  .option("streamTimestamp", "true")
  .option("streamTimestampField", "event_time")
  .load()

// The resulting DataFrame will include an 'event_time' field
// containing the timestamp from the Redis stream message ID

Streaming Write Examples

Micro-batch Processing
streamingDF.writeStream
  .format("redis")
  .option("type", "stream")
  .option("keyspace", "events")
  .outputMode("append")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .start()
Continuous Processing
streamingDF.writeStream
  .format("redis")
  .option("type", "json")
  .option("key", "userId")
  .outputMode("append")
  .trigger(Trigger.Continuous("1 second"))
  .start()

Write Operation Behavior

All write operations are upserts - they will create new keys or overwrite existing ones. The connector does not support:

  • Explicit insert vs update operations

  • Conditional updates

  • Delete operations (except keyspace truncation in overwrite mode)

Performance Tuning

Batch Size Optimization

For high-throughput scenarios, increase the batch size:

dataFrame.write
  .format("redis")
  .option("batch", "200")  // Increased from default 50
  .save()

Rate Limiting

To protect Redis from being overwhelmed, you can enable rate limiting to control the number of write operations per second:

dataFrame.write
  .format("redis")
  .option("rate", "1000")  // Limit to 1000 operations per second
  .option("type", "hash")
  .option("key", "id")
  .save()

Connection Pooling

The connector automatically manages connection pooling for optimal performance.

Error Handling

Keyspace Cleanup

The connector provides a utility method to clean up keyspaces:

WriteConfig config = new WriteConfig(configMap);
int deletedCount = config.deleteKeyspace(logger);

Atomic Operations

Individual batches are atomic - either the entire batch succeeds or fails. However, there are no transactional guarantees across multiple batches.