Configuration
You can configure read and write operations in both batch and streaming modes. Note that Redis streams only support streaming mode.
Specify Configuration
Using SparkConf
You can specify configuration options with SparkConf using any of the following approaches:
-
The SparkConf constructor in your application. To learn more, see the SparkConf documentation for your language:
-
The
--conf
flag at runtime. To learn more, see Dynamically Loading Spark Properties in the Spark documentation. -
The
$SPARK_HOME/conf/spark-default.conf
file.
Using an Options Map
In the Spark API, the DataFrameReader
, DataFrameWriter
, DataStreamReader
, and DataStreamWriter
classes each contain an option()
method.
You can use this method to specify options for the underlying read or write operation.
Options specified in this way override any corresponding settings in SparkConf .
|
Short-Form Syntax
Options maps support short-form syntax. You may omit the prefix when specifying an option key string.
For example, the following syntaxes are equivalent to one another:
-
df.option("spark.redis.write.type", "hash").save()
-
df.option("spark.redis.type", "hash").save()
-
df.option("type", "hash").save()
Connection Options
The following options apply to both read and write operations.
redis.<read|write>.connection.uri
-
Redis URI in the form
redis://username:password@host:port
. For secure (TLS) connections userediss://
. redis.<read|write>.connection.cluster
-
Set to true when connecting to a Redis Cluster.
TLS Connection Options
For secure (TLS) connections use rediss://
as the Redis URI scheme.
redis.<read|write>.connection.ssl.cacert
-
Certificate file to load trusted certificates. The file must provide X.509 certificates in PEM format.
redis.<read|write>.connection.ssl.cert
-
X.509 certificate chain in PEM format to use for client authentication.
redis.<read|write>.connection.ssl.key
-
PKCS#8 private key in PEM format to use for client authentication.
redis.<read|write>.connection.ssl.key.password
-
Password for the private key if it’s password-protected.
Read Options
spark.redis.read.type
-
Type of reader to use for reading data from Redis (
KEYS
orSTREAM
) spark.redis.read.schema
-
Specifies known fields to use when inferring the schema from Redis, in the form
<field1> <type>, <field2> <type>
where type is one ofSTRING TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE DATE TIMESTAMP
.
Redis Keys
With read type KEYS
the connector iterates over keys using the Redis SCAN
command and then fetches the corresponding values.
spark.redis.read.keyPattern
-
Read keys matching the given glob-style pattern (default:
*
). spark.redis.read.keyType
-
Read keys matching the given type e.g.
string
,hash
,json
(default: all types). spark.redis.read.threads
-
Number of reader threads to use in parallel (default:
1
). spark.redis.read.batch
-
Number of keys each thread fetches the values for at a time in a pipeline call.
spark.redis.read.pool
-
Maximum number of Redis connections to use across threads (default:
8
). spark.redis.read.scanCount
-
Number of keys to read at once on each scan call.
spark.redis.read.queueCapacity
-
Max number of values that the reader threads can queue up (default:
10000
). spark.redis.read.readFrom
-
Which Redis cluster nodes to read from. See Lettuce ReadFrom.
Streaming
When using Spark streaming the Redis Connector for Spark supports both micro-batch processing and continuous processing.
In this mode the connector reads a change stream from Redis using keyspace notifications in addition to the scan described previously.
spark.redis.read.eventQueueCapacity
-
Capacity of the keyspace notification queue (default:
10000
). spark.redis.read.idleTimeout
-
Min duration in milliseconds to consider reader complete.
spark.redis.read.flushInterval
-
Max duration in milliseconds between flushes.
Redis Stream
Use read type STREAM
to read messages from a Redis stream.
The connector uses Redis Consumer Groups for guaranteed message delivery and load balancing across Spark partitions or workers.
This provides streaming semantics with at-least-once delivery guarantees for both micro-batch and continuous processing.
spark.redis.read.streamKey
-
Key of the Redis stream to read from.
spark.redis.read.consumerGroup
-
Name of the Redis consumer group (default:
spark-redis-group
). spark.redis.read.consumerName
-
Optional explicit consumer name. If not specified, uses partition-based naming:
spark-redis-{partitionId}
. spark.redis.read.streamOffset
-
Initial stream position for the consumer group. Used when creating the consumer group (default:
0
). spark.redis.read.streamBlock
-
Block timeout in milliseconds for XREADGROUP operations (default: 100).
spark.redis.read.streamCount
-
Maximum number of messages to fetch in each XREADGROUP call (default: 50).
spark.redis.read.numPartitions
-
Number of Spark partitions to create for parallel processing. Each partition becomes a separate consumer in the Redis consumer group (default: 1).
spark.redis.read.ackBatchSize
-
Number of messages to acknowledge in a single batch operation for better throughput (default: 50). Higher values improve throughput but may increase latency before messages are acknowledged.
spark.redis.read.streamTimestamp
-
When set to
true
, extracts the timestamp portion from Redis stream message IDs and adds it as a field to the DataFrame (default:false
). spark.redis.read.streamTimestampField
-
Name of the field to store the extracted timestamp when
streamTimestamp
is enabled (default:timestamp
).
Consumer Group Behavior
When using consumer group mode:
-
Guaranteed Delivery: Each message is delivered to exactly one consumer in the group
-
Load Balancing: Redis automatically distributes messages across available consumers
-
Fault Tolerance: Unacknowledged messages can be reclaimed by other consumers
-
Consumer Naming: Each Spark partition creates a unique consumer with name
spark-redis-{partitionId}
-
Automatic Group Creation: The consumer group is created automatically if it doesn’t exist
-
Message Acknowledgment: Messages are acknowledged after successful processing using
XACK
Partition Management
The Redis connector supports configurable partitioning for parallel stream processing:
-
Single Partition (default):
numPartitions=1
creates one consumer per streaming query -
Multiple Partitions:
numPartitions=N
creates N consumers in the same consumer group for parallel processing -
Load Balancing: Redis automatically distributes messages across multiple consumers
-
Parallel Execution: Each partition runs on a separate Spark executor for true parallelism
To scale processing:
-
Increase partitions: Set
numPartitions
to match your available executor cores -
Balance throughput vs latency: More partitions = higher throughput but potential higher latency due to coordination
-
Monitor consumer lag: Ensure all consumers are processing messages evenly
Write Options
The Redis Connector for Spark supports writing data from Apache Spark DataFrames to Redis using multiple data types and write modes.
spark.redis.write.pool
-
Maximum number of Redis connections to use across threads (default:
8
). spark.redis.write.rate
-
Rate limit for write operations in operations per second. When set to a value greater than 0, enables throttling to limit the number of write operations per second (default:
0
- no rate limiting).
Write Data Types
The connector supports writing to four Redis data types:
spark.redis.write.type
-
Redis data-structure type to write to:
hash
,json
,string
,stream
(default:json
).
JSON Type (Default)
Uses the Redis JSON.SET
command to store entire rows as JSON documents. This is the recommended type for complex nested data structures.
dataFrame.write
.format("redis")
.option("type", "json")
.option("key", "id")
.option("keyspace", "users")
.save()
HASH Type
Uses the Redis HSET
command where each DataFrame field becomes a hash field. Ideal for flat records with efficient field-level access.
dataFrame.write
.format("redis")
.option("type", "hash")
.option("key", "id")
.option("keyspace", "users")
.save()
Key Configuration
spark.redis.write.keyspace
-
Prefix for keys written to Redis (default:
spark
). spark.redis.write.key
-
Field or list of fields used to compose keys written to Redis (default: auto-generated). Separate with a comma to specify more than one field, e.g.
field1,field2
. spark.redis.write.keySeparator
-
Separator character for keys (default:
:
).
For types other than stream you should specify a key field otherwise all writes will go to the same single key equal to keyspace.
|
Save Modes
The connector supports two primary save modes:
Batch Mode
spark.redis.write.batch
-
Maximum number of keys to write to Redis in a pipeline (default:
50
). spark.redis.write.scanCount
-
Number of keys to scan per iteration for keyspace operations (default:
100
).
Batch mode is used for standard DataFrame write operations:
dataFrame.write
.format("redis")
.option("batch", "100") // Increase batch size
.option("type", "hash")
.option("key", "id")
.save()
Streaming Mode
The connector supports both micro-batch and continuous processing modes for streaming writes.
For streaming reads, the connector uses Redis Consumer Groups:
Streaming Read Examples
Basic Usage (Single Partition)
# Read using XREADGROUP for guaranteed delivery (default mode)
streamDF = spark.readStream \
.format("redis") \
.option("type", "stream") \
.option("streamKey", "events") \
.option("consumerGroup", "spark-analytics") \
.option("streamBlock", "1000") \
.load()
// Read using XREADGROUP for guaranteed delivery (default mode)
val streamDF = spark.readStream
.format("redis")
.option("type", "stream")
.option("streamKey", "events")
.option("consumerGroup", "spark-analytics")
.option("streamBlock", "1000")
.load()
Parallel Processing (Multiple Partitions)
# Use multiple partitions for parallel processing
streamDF = spark.readStream \
.format("redis") \
.option("type", "stream") \
.option("streamKey", "events") \
.option("consumerGroup", "spark-analytics") \
.option("numPartitions", "4") \
.option("streamBlock", "1000") \
.load()
// Use multiple partitions for parallel processing
val streamDF = spark.readStream
.format("redis")
.option("type", "stream")
.option("streamKey", "events")
.option("consumerGroup", "spark-analytics")
.option("numPartitions", "4")
.option("streamBlock", "1000")
.load()
Stream Timestamp Extraction
# Extract timestamps from Redis stream message IDs
streamDF = spark.readStream \
.format("redis") \
.option("type", "stream") \
.option("streamKey", "events") \
.option("consumerGroup", "spark-analytics") \
.option("streamTimestamp", "true") \
.option("streamTimestampField", "event_time") \
.load()
# The resulting DataFrame will include an 'event_time' field
# containing the timestamp from the Redis stream message ID
// Extract timestamps from Redis stream message IDs
val streamDF = spark.readStream
.format("redis")
.option("type", "stream")
.option("streamKey", "events")
.option("consumerGroup", "spark-analytics")
.option("streamTimestamp", "true")
.option("streamTimestampField", "event_time")
.load()
// The resulting DataFrame will include an 'event_time' field
// containing the timestamp from the Redis stream message ID
Write Operation Behavior
All write operations are upserts - they will create new keys or overwrite existing ones. The connector does not support:
-
Explicit insert vs update operations
-
Conditional updates
-
Delete operations (except keyspace truncation in overwrite mode)
Performance Tuning
Batch Size Optimization
For high-throughput scenarios, increase the batch size:
dataFrame.write
.format("redis")
.option("batch", "200") // Increased from default 50
.save()