Introduction

The Redis Connector for Spark provides integration between Redis and Apache Spark and supports reading data from and writing data to Redis. Within a Spark 3 environment the connector enables users to read data from Redis, manipulate it using Spark operations, and then write results back to Redis or to another system. Data can also be imported to Redis by reading it from any data source supported by Spark and then writing it to Redis.

The connector has the following system requirements:

  • Spark 3.5.4 is recommended but versions 3.1 to 3.5 should work as well.

  • When choosing a Spark distribution you must select one that uses Scala 2.12.

  • Redis version 5 or higher.

  • Java 17 or higher (if using Java to run Spark).

Getting Started

Java

Dependency Management

Provide the Spark SQL and Redis Spark Connector dependencies to your dependency management tool.

Gradle
dependencies {
    implementation 'com.redis:redis-spark-connector:0.2.2'
    implementation 'org.apache.spark:spark-sql_2.12:3.5.4'
}
Maven
<dependencies>
  <dependency>
    <groupId>com.redis</groupId>
    <artifactId>redis-spark-connector</artifactId>
    <version>0.2.2</version>
  </dependency>
  <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.12</artifactId>
    <version>3.5.4</version>
  </dependency>
</dependencies>

Spark Session Configuration

package com.redis.examples;

import org.apache.spark.sql.SparkSession;

public class RedisSparkExample {

  public static void main(String[] args) throws Exception {
    SparkSession spark = SparkSession.builder()
      .master("local")
      .appName("RedisSparkExample")
      .config("spark.redis.read.connection.uri", "redis://localhost:6379")
      .config("spark.redis.write.connection.uri", "redis://localhost:6379")
      .getOrCreate();
  }
}

For Redis Spark Connector configuration details, see the Configuring Spark section.

Python

PySpark

This guide describes how to use PySpark with the Redis Spark Connector but this works with self-contained Python applications as well.

When starting pyspark you must use one of the following options to add the package to the classpath:

--packages com.redis:redis-spark-connector:0.2.2

downloads the Redis Spark Connector package using the given Maven coordinates or

--jars path/to/redis-spark-connector-0.2.2.jar

adds the downloaded Redis Spark Connector jar to the classpath

You can specify --conf option(s) to configure the connector.

PySpark example
pyspark --conf "spark.redis.read.connection.uri=redis://localhost:6379" \
        --conf "spark.redis.write.connection.uri=redis://localhost:6379" \
        --packages com.redis:redis-spark-connector:0.2.2

Python Application

Create a SparkSession object using the same configuration options as before:

from pyspark.sql import SparkSession

spark_session = SparkSession
       .builder
       .appName("myApp")
       .config("spark.redis.read.connection.uri", "redis://localhost:6379")
       .config("spark.redis.write.connection.uri", "redis://localhost:6379")
       .getOrCreate()

Scala

Spark Shell

When starting the Spark shell you must use one of the following options to add the package to the classpath:

--packages com.redis:redis-spark-connector:0.2.2

downloads the Redis Spark Connector package using the given Maven coordinates or

--jars path/to/redis-spark-connector-0.2.2.jar

adds the downloaded Redis Spark Connector jar to the classpath

You can specify --conf option(s) to configure the connector.

Spark shell example
spark-shell --conf "spark.redis.read.connection.uri=redis://localhost:6379" \
            --conf "spark.redis.write.connection.uri=redis://localhost:6379" \
            --packages com.redis:redis-spark-connector:0.2.2

Scala Application

Dependency Management

Provide the Spark SQL and Redis Spark Connector dependencies to your dependency management tool.

SBT example
scalaVersion := "2.12",
libraryDependencies ++= Seq(
  "com.redis" %% "redis-spark-connector" % "0.2.2",
  "org.apache.spark" %% "spark-sql" % "3.5.4"
)
Spark Session Configuration
package com.redis
object RedisSparkExample {
  def main(args: Array[String]): Unit = {
    import org.apache.spark.sql.SparkSession
    val sparkSession = SparkSession.builder()
      .master("local")
      .appName("RedisSparkExample")
      .config("spark.redis.read.connection.uri", "redis://localhost:6379")
      .config("spark.redis.write.connection.uri", "redis://localhost:6379")
      .getOrCreate()
  }
}

Databricks

Spark and Redis together unlock powerful capabilities for data professionals. This guide demonstrates how to integrate these technologies for enhanced analytics, real-time processing, and machine learning applications.

In this hands-on notebook, you’ll learn how to make efficient use of Redis data structures alongside Spark’s distributed computing framework. You’ll see firsthand how to extract data from Redis, process it in Spark, and write results back to Redis for application use. Key topics include:

  1. Setting up the Spark-Redis connector in Databricks

  2. Writing data to Redis from Spark

  3. Reading data from Redis for application access

Import Notebook

You can edit and run this notebook by importing it into your Databricks account. Select Import from any folder’s menu and paste the URL below:

https://github.com/redis-field-engineering/redis-spark-dist/raw/refs/heads/main/redis-spark-notebook.py

Databricks Cluster Setup with Redis Connector

  1. Set up a new Databricks cluster

  2. Go to the cluster’s Libraries section

  3. Select Install New

  4. Choose Maven as your source and click Search Packages

  5. Enter redis-spark-connector and select com.redis:redis-spark-connector:x.y.z

  6. Finalize by clicking Install

Want to explore the connector’s full capabilities? Check the detailed documentation

Loading Test Data into Spark

In this step, you import a CSV file into your Unity Catalog volume. This is a shortened version of the Import and visualize CSV data notebook.

Replace <catalog-name>, <schema-name>, and <volume-name> with the catalog, schema, and volume names for a Unity Catalog volume. Optionally replace the table_name value with a table name of your choice.

catalog = "<catalog_name>"
schema = "<schema_name>"
volume = "<volume_name>"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
file_name = "baby_names.csv"
table_name = "baby_names"
path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
path_table = catalog + "." + schema
dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
df = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",")

Setting Up Redis Cloud Environment

Redis Cloud offers a fully-managed Redis service ideal for this integration. Follow these steps:

  1. Register for a Redis Cloud account

  2. Follow the quickstart guide to create a free tier database

Configuring Spark with Redis Connection Details

  1. From your Redis Cloud database dashboard, find your connection endpoint under Connect. The string follows this pattern: redis://<user>:<pass>@<host>:<port>

  2. In Databricks, open your cluster settings and locate Advanced Options. Under Spark in the Spark config text area, add your Redis connection string as both spark.redis.read.connection.uri redis://…​ and spark.redis.write.connection.uri redis://…​ parameters. This configuration applies to all notebooks using this cluster.

Writing Data to Redis

Let’s use the df test data we imported earlier and write it to Redis.

df.write.format("redis").mode("overwrite").option("type", "hash").option("keyspace", "baby").option("key", "First Name").save()

Exploring Your Redis Data

Examine the keys and values you’ve created using RedisInsight, Redis' visual data browser. From your Redis Cloud database dashboard, click on Redis Insight and explore the data imported from Spark.

Reading from Redis

We can now read the data from Redis using the following line.

redisDF = spark.read.format("redis").load()
display(redisDF)

Configuration

TLS Connections

This section walks you through the process of implementing TLS/SSL security for communications between Spark and Redis.

Secure connections requires proper certificate management for Spark workers. These certificates must be properly stored in JVM trust and key stores, which can be configured either through Spark’s configuration system or via command-line parameters when initiating Spark jobs.

Trust Store

The trust store is essential for verifying the identity of external systems. It contains certificates that validate the authenticity of connections to other applications, ensuring secure and legitimate communications.

Trust Store Setup
keytool -importcert -trustcacerts \
        -file <path to certificate authority file> \
        -keystore <path to trust store> \
        -storepass <password>

Key Store

While the trust store verifies external systems, the key store maintains your application’s identity credentials. Other systems use these credentials to verify connections from your application, completing the chain of trust.

You can generate your key store using standard cryptographic tools like keytool or openssl.

Enable TLS

To establish TLS connections with Redis, you’ll need to specify rediss as the URI scheme.

Sample Redis Connection String
rediss://[username:]password@host:port

Certificate Store Configuration

Table 1. Required JVM System Properties
Property Purpose

javax.net.ssl.trustStore

Path to the trust store file

javax.net.ssl.trustStorePassword

Password for the trust store

javax.net.ssl.keyStore

Path to the key store file

javax.net.ssl.keyStorePassword

Password for the key store

Spark Configuration

Spark Configuration Properties
spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore=<Path to trust store> -Djavax.net.ssl.trustStorePassword=<Trust store password> -Djavax.net.ssl.keyStore=<Path to key store> -Djavax.net.ssl.keyStorePassword=<Key store password>

spark.driver.extraJavaOptions=-Djavax.net.ssl.trustStore=<Path to trust store> -Djavax.net.ssl.trustStorePassword=<Trust store password> -Djavax.net.ssl.keyStore=<Path to key store> -Djavax.net.ssl.keyStorePassword=<Key store password>

Command-Line Configuration

./bin/spark-submit --name "<app name>" \
    --master "<Master URL>" \
    --conf "spark.executor.extraJavaOptions=-Djavax.net.ssl.trustStore=<Path to trust store> -Djavax.net.ssl.trustStorePassword=<Trust store password> -Djavax.net.ssl.keyStore=<Path to key store> -Djavax.net.ssl.keyStorePassword=<Key store password>" \
    --conf "spark.driver.extraJavaOptions=-Djavax.net.ssl.trustStore=<Path to trust store> -Djavax.net.ssl.trustStorePassword=<Trust store password> -Djavax.net.ssl.keyStore=<Path to key store> -Djavax.net.ssl.keyStorePassword=<Key store password>" \
    sparkApplication.jar

Read Options

spark.redis.read.connection.uri

Redis URI in the form redis://username:password@host:port. For secure (TLS) connections use rediss://.

spark.redis.read.connection.cluster

Set to true when connecting to a Redis Cluster.

spark.redis.read.type

Type of reader to use for reading data from Redis (KEYS or STREAM)

spark.redis.read.schema

Specifies known fields to use when inferring the schema from Redis, in the form <field1> <type>, <field2> <type> where type is one of STRING TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE DATE TIMESTAMP.

Redis Keys

With read type KEYS the connector iterates over keys using the Redis SCAN command and then fetches the corresponding values.

spark.redis.read.keyPattern

Read keys matching the given glob-style pattern (default: *).

spark.redis.read.keyType

Read keys matching the given type e.g. STRING, HASH, JSON (default: all types).

spark.redis.read.threads

Number of reader threads to use in parallel (default: 1).

spark.redis.read.batch

Number of keys each thread fetches the values for at a time in a pipeline call.

spark.redis.read.pool

Maximum number of Redis connections to use across threads (default: 8).

spark.redis.read.scanCount

Number of keys to read at once on each SCAN call.

spark.redis.read.queueCapacity

Max number of values that the reader threads can queue up (default: 10000).

spark.redis.read.readFrom

Which Redis cluster nodes to read from. See Lettuce ReadFrom.

Streaming

When using Spark streaming the Redis Spark Connector supports both micro-batch processing and continuous processing.

In this mode the connector reads a change stream from Redis using keyspace notifications in addition to the scan described previously.

spark.redis.read.eventQueueCapacity

Capacity of the keyspace notification queue (default: 10000).

spark.redis.read.idleTimeout

Min duration in milliseconds to consider reader complete.

spark.redis.read.flushInterval

Max duration in milliseconds between flushes.

Redis Stream

Use read type STREAM to read messages from a Redis stream.

spark.redis.read.streamKey

Key of the Redis stream to read from.

Batch Mode

In batch mode the connector uses the Redis XRANGE command to read messages from the given stream.

spark.redis.read.streamRangeStart

ID to start reading from (default: -).

spark.redis.read.streamRangeEnd

Max ID to read (default: +).

spark.redis.read.streamCount

Max number of messages to read.

Streaming Mode

In streaming mode the connector uses the Redis XREAD command to read messages from the given stream.

spark.redis.read.offset

Initial message ID to read from the stream.

spark.redis.read.streamBlock

Maximum duration in milliseconds that XREAD will wait for messages to be available.

spark.redis.read.streamCount

Maximum number of messages to fetch in each XREAD call.

Write Options

spark.redis.write.connection.uri

Redis URI in the form redis://username:password@host:port. For secure (TLS) connections use rediss://.

spark.redis.write.connection.cluster

Set to true when connecting to a Redis Cluster.

spark.redis.write.type

Redis data-structure type to write to: HASH, JSON, STRING.

spark.redis.write.keyspace

Prefix for keys written to Redis (default: spark).

spark.redis.write.key

Field or list of fields used to compose keys written to Redis (default: id). Separate with a comma to specify more than one field, e.g. field1,field2.

Batch Mode

spark.redis.write.batch

Maximum number of keys to write to Redis in a pipeline.

Streaming Mode

Writing in streaming mode does not have specific properties.

Support

Redis Spark Connector is supported by Redis, Inc. for enterprise-tier customers as a 'Developer Tool' under the Redis Software Support Policy. For non enterprise-tier customers we supply support for Redis Spark Connector on a good-faith basis. To report bugs, request features, or receive assistance, please file an issue.

License

Redis Spark Connector is licensed under the Business Source License 1.1.

Copyright © 2024 Redis, Inc.

See LICENSE for details.