Introduction
The Redis Connector for Spark provides integration between Redis and Apache Spark and supports reading data from and writing data to Redis. Within a Spark 3 environment the connector enables users to read data from Redis, manipulate it using Spark operations, and then write results back to Redis or to another system. Data can also be imported to Redis by reading it from any data source supported by Spark and then writing it to Redis.
The connector has the following system requirements:
-
Spark 3.5.4 is recommended but versions 3.1 to 3.5 should work as well.
-
When choosing a Spark distribution you must select one that uses Scala 2.12.
-
Redis version 5 or higher.
-
Java 17 or higher (if using Java to run Spark).
Getting Started
Java
Dependency Management
Provide the Spark SQL and Redis Spark Connector dependencies to your dependency management tool.
dependencies {
implementation 'com.redis:redis-spark-connector:0.2.2'
implementation 'org.apache.spark:spark-sql_2.12:3.5.4'
}
<dependencies>
<dependency>
<groupId>com.redis</groupId>
<artifactId>redis-spark-connector</artifactId>
<version>0.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>3.5.4</version>
</dependency>
</dependencies>
Spark Session Configuration
package com.redis.examples;
import org.apache.spark.sql.SparkSession;
public class RedisSparkExample {
public static void main(String[] args) throws Exception {
SparkSession spark = SparkSession.builder()
.master("local")
.appName("RedisSparkExample")
.config("spark.redis.read.connection.uri", "redis://localhost:6379")
.config("spark.redis.write.connection.uri", "redis://localhost:6379")
.getOrCreate();
}
}
For Redis Spark Connector configuration details, see the Configuring Spark section.
Python
PySpark
This guide describes how to use PySpark with the Redis Spark Connector but this works with self-contained Python applications as well.
When starting pyspark
you must use one of the following options to add the package to the classpath:
--packages com.redis:redis-spark-connector:0.2.2
-
downloads the Redis Spark Connector package using the given Maven coordinates or
--jars path/to/redis-spark-connector-0.2.2.jar
-
adds the downloaded Redis Spark Connector jar to the classpath
You can specify --conf
option(s) to configure the connector.
pyspark --conf "spark.redis.read.connection.uri=redis://localhost:6379" \
--conf "spark.redis.write.connection.uri=redis://localhost:6379" \
--packages com.redis:redis-spark-connector:0.2.2
Python Application
Create a SparkSession
object using the same configuration options as before:
from pyspark.sql import SparkSession
spark_session = SparkSession
.builder
.appName("myApp")
.config("spark.redis.read.connection.uri", "redis://localhost:6379")
.config("spark.redis.write.connection.uri", "redis://localhost:6379")
.getOrCreate()
Scala
Spark Shell
When starting the Spark shell you must use one of the following options to add the package to the classpath:
--packages com.redis:redis-spark-connector:0.2.2
-
downloads the Redis Spark Connector package using the given Maven coordinates or
--jars path/to/redis-spark-connector-0.2.2.jar
-
adds the downloaded Redis Spark Connector jar to the classpath
You can specify --conf
option(s) to configure the connector.
spark-shell --conf "spark.redis.read.connection.uri=redis://localhost:6379" \
--conf "spark.redis.write.connection.uri=redis://localhost:6379" \
--packages com.redis:redis-spark-connector:0.2.2
Scala Application
Dependency Management
Provide the Spark SQL and Redis Spark Connector dependencies to your dependency management tool.
scalaVersion := "2.12",
libraryDependencies ++= Seq(
"com.redis" %% "redis-spark-connector" % "0.2.2",
"org.apache.spark" %% "spark-sql" % "3.5.4"
)
Spark Session Configuration
package com.redis
object RedisSparkExample {
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
val sparkSession = SparkSession.builder()
.master("local")
.appName("RedisSparkExample")
.config("spark.redis.read.connection.uri", "redis://localhost:6379")
.config("spark.redis.write.connection.uri", "redis://localhost:6379")
.getOrCreate()
}
}
Databricks
Spark and Redis together unlock powerful capabilities for data professionals. This guide demonstrates how to integrate these technologies for enhanced analytics, real-time processing, and machine learning applications.
In this hands-on notebook, you’ll learn how to make efficient use of Redis data structures alongside Spark’s distributed computing framework. You’ll see firsthand how to extract data from Redis, process it in Spark, and write results back to Redis for application use. Key topics include:
-
Setting up the Spark-Redis connector in Databricks
-
Writing data to Redis from Spark
-
Reading data from Redis for application access
Import Notebook
You can edit and run this notebook by importing it into your Databricks account. Select Import from any folder’s menu and paste the URL below:
https://github.com/redis-field-engineering/redis-spark-dist/raw/refs/heads/main/redis-spark-notebook.py
Databricks Cluster Setup with Redis Connector
-
Set up a new Databricks cluster
-
Go to the cluster’s Libraries section
-
Select Install New
-
Choose Maven as your source and click Search Packages
-
Enter
redis-spark-connector
and selectcom.redis:redis-spark-connector:x.y.z
-
Finalize by clicking Install
Want to explore the connector’s full capabilities? Check the detailed documentation
Loading Test Data into Spark
In this step, you import a CSV file into your Unity Catalog volume. This is a shortened version of the Import and visualize CSV data notebook.
Replace <catalog-name>
, <schema-name>
, and <volume-name>
with the catalog, schema, and volume names for a Unity Catalog volume. Optionally replace the table_name
value with a table name of your choice.
catalog = "<catalog_name>"
schema = "<schema_name>"
volume = "<volume_name>"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
file_name = "baby_names.csv"
table_name = "baby_names"
path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
path_table = catalog + "." + schema
dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
df = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",")
Setting Up Redis Cloud Environment
Redis Cloud offers a fully-managed Redis service ideal for this integration. Follow these steps:
-
Register for a Redis Cloud account
-
Follow the quickstart guide to create a free tier database
Configuring Spark with Redis Connection Details
-
From your Redis Cloud database dashboard, find your connection endpoint under Connect. The string follows this pattern:
redis://<user>:<pass>@<host>:<port>
-
In Databricks, open your cluster settings and locate Advanced Options. Under Spark in the Spark config text area, add your Redis connection string as both
spark.redis.read.connection.uri redis://…
andspark.redis.write.connection.uri redis://…
parameters. This configuration applies to all notebooks using this cluster.
Writing Data to Redis
Let’s use the df
test data we imported earlier and write it to Redis.
df.write.format("redis").mode("overwrite").option("type", "hash").option("keyspace", "baby").option("key", "First Name").save()
Configuration
TLS Connections
This section walks you through the process of implementing TLS/SSL security for communications between Spark and Redis.
Secure connections requires proper certificate management for Spark workers. These certificates must be properly stored in JVM trust and key stores, which can be configured either through Spark’s configuration system or via command-line parameters when initiating Spark jobs. |
Trust Store
The trust store is essential for verifying the identity of external systems. It contains certificates that validate the authenticity of connections to other applications, ensuring secure and legitimate communications.
keytool -importcert -trustcacerts \
-file <path to certificate authority file> \
-keystore <path to trust store> \
-storepass <password>
Key Store
While the trust store verifies external systems, the key store maintains your application’s identity credentials. Other systems use these credentials to verify connections from your application, completing the chain of trust.
Enable TLS
To establish TLS connections with Redis, you’ll need to specify rediss
as the URI scheme.
rediss://[username:]password@host:port
Certificate Store Configuration
Property | Purpose |
---|---|
javax.net.ssl.trustStore |
Path to the trust store file |
javax.net.ssl.trustStorePassword |
Password for the trust store |
javax.net.ssl.keyStore |
Path to the key store file |
javax.net.ssl.keyStorePassword |
Password for the key store |
Spark Configuration
spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore=<Path to trust store> -Djavax.net.ssl.trustStorePassword=<Trust store password> -Djavax.net.ssl.keyStore=<Path to key store> -Djavax.net.ssl.keyStorePassword=<Key store password>
spark.driver.extraJavaOptions=-Djavax.net.ssl.trustStore=<Path to trust store> -Djavax.net.ssl.trustStorePassword=<Trust store password> -Djavax.net.ssl.keyStore=<Path to key store> -Djavax.net.ssl.keyStorePassword=<Key store password>
Command-Line Configuration
./bin/spark-submit --name "<app name>" \
--master "<Master URL>" \
--conf "spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore=<Path to trust store> -Djavax.net.ssl.trustStorePassword=<Trust store password> -Djavax.net.ssl.keyStore=<Path to key store> -Djavax.net.ssl.keyStorePassword=<Key store password>" \
--conf "spark.driver.extraJavaOptions=-Djavax.net.ssl.trustStore=<Path to trust store> -Djavax.net.ssl.trustStorePassword=<Trust store password> -Djavax.net.ssl.keyStore=<Path to key store> -Djavax.net.ssl.keyStorePassword=<Key store password>" \
sparkApplication.jar
Read Options
spark.redis.read.connection.uri
-
Redis URI in the form
redis://username:password@host:port
. For secure (TLS) connections userediss://
. spark.redis.read.connection.cluster
-
Set to true when connecting to a Redis Cluster.
spark.redis.read.type
-
Type of reader to use for reading data from Redis (
KEYS
orSTREAM
) spark.redis.read.schema
-
Specifies known fields to use when inferring the schema from Redis, in the form
<field1> <type>, <field2> <type>
where type is one ofSTRING TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE DATE TIMESTAMP
.
Redis Keys
With read type KEYS
the connector iterates over keys using the Redis SCAN
command and then fetches the corresponding values.
spark.redis.read.keyPattern
-
Read keys matching the given glob-style pattern (default:
*
). spark.redis.read.keyType
-
Read keys matching the given type e.g.
STRING
,HASH
,JSON
(default: all types). spark.redis.read.threads
-
Number of reader threads to use in parallel (default:
1
). spark.redis.read.batch
-
Number of keys each thread fetches the values for at a time in a pipeline call.
spark.redis.read.pool
-
Maximum number of Redis connections to use across threads (default:
8
). spark.redis.read.scanCount
-
Number of keys to read at once on each SCAN call.
spark.redis.read.queueCapacity
-
Max number of values that the reader threads can queue up (default:
10000
). spark.redis.read.readFrom
-
Which Redis cluster nodes to read from. See Lettuce ReadFrom.
Streaming
When using Spark streaming the Redis Spark Connector supports both micro-batch processing and continuous processing.
In this mode the connector reads a change stream from Redis using keyspace notifications in addition to the scan described previously.
spark.redis.read.eventQueueCapacity
-
Capacity of the keyspace notification queue (default:
10000
). spark.redis.read.idleTimeout
-
Min duration in milliseconds to consider reader complete.
spark.redis.read.flushInterval
-
Max duration in milliseconds between flushes.
Redis Stream
Use read type STREAM
to read messages from a Redis stream.
spark.redis.read.streamKey
-
Key of the Redis stream to read from.
Batch Mode
In batch mode the connector uses the Redis XRANGE command to read messages from the given stream.
spark.redis.read.streamRangeStart
-
ID to start reading from (default:
-
). spark.redis.read.streamRangeEnd
-
Max ID to read (default:
+
). spark.redis.read.streamCount
-
Max number of messages to read.
Streaming Mode
In streaming mode the connector uses the Redis XREAD command to read messages from the given stream.
spark.redis.read.offset
-
Initial message ID to read from the stream.
spark.redis.read.streamBlock
-
Maximum duration in milliseconds that XREAD will wait for messages to be available.
spark.redis.read.streamCount
-
Maximum number of messages to fetch in each XREAD call.
Write Options
spark.redis.write.connection.uri
-
Redis URI in the form
redis://username:password@host:port
. For secure (TLS) connections userediss://
. spark.redis.write.connection.cluster
-
Set to true when connecting to a Redis Cluster.
spark.redis.write.type
-
Redis data-structure type to write to:
HASH
,JSON
,STRING
. spark.redis.write.keyspace
-
Prefix for keys written to Redis (default:
spark
). spark.redis.write.key
-
Field or list of fields used to compose keys written to Redis (default:
id
). Separate with a comma to specify more than one field, e.g.field1,field2
.
Support
Redis Spark Connector is supported by Redis, Inc. for enterprise-tier customers as a 'Developer Tool' under the Redis Software Support Policy. For non enterprise-tier customers we supply support for Redis Spark Connector on a good-faith basis. To report bugs, request features, or receive assistance, please file an issue.
License
Redis Spark Connector is licensed under the Business Source License 1.1.
Copyright © 2024 Redis, Inc.
See LICENSE for details.