1. Overview

Redis Kafka Connector is a Confluent-verified connector that stores data from Kafka topics into Redis and pushes data from Redis into Kafka topics.

redis kafka connector

This guide provides documentation and usage information across the following topics:

2. Quick Start

This section shows how to configure the Redis Kafka Connector to import/export data between Redis and Apache Kafka and provides a hands-on look at the functionality of the source and sink connectors.

2.1. Requirements

Download and install the following software:

2.2. Start the Sandbox

The sandbox starts the following Docker services:

  • Redis Stack

  • Apache Kafka

  • Kafka Connect with the Redis Kafka Connector installed

To start the sandbox run the following command:

docker compose up

After Docker downloads and starts the services you should see the following output:

[+] Running 8/0
 ✔ Container redis            Created
 ✔ Container zookeeper        Created
 ✔ Container broker           Created
 ✔ Container schema-registry  Created
 ✔ Container rest-proxy       Created
 ✔ Container connect          Created
 ✔ Container ksqldb-server    Created
 ✔ Container control-center   Created

2.3. Add Connectors

Now that the required services are up and running, we can add connectors to Kafka Connect to transfer data between Redis and Kafka:

  • Add a sink connector to transfer data from Kafka to Redis

  • Add a source connector to transfer data from Redis to Kafka

2.3.1. Add a Datagen

Kafka Connect Datagen is a Kafka Connect source connector for generating mock data.

Create the Datagen connector with the following command:

curl -X POST -H "Content-Type: application/json" --data '
  { "name": "datagen-pageviews",
    "config": {
      "connector.class": "io.confluent.kafka.connect.datagen.DatagenConnector",
      "kafka.topic": "pageviews",
      "quickstart": "pageviews",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable": "false",
      "producer.interceptor.classes": "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor",
      "max.interval": 200,
      "iterations": 10000000,
      "tasks.max": "1"
}}' http://localhost:8083/connectors -w "\n"

This automatically creates the Kafka topic pageviews and produces data with a schema configuration from pageviews_schema.avro

Why do I see the message 'Failed to connect'?

It takes up to three minutes for the Kafka Connect REST API to start. If you receive the following error, wait three minutes and run the preceding command again.

curl: (7) Failed to connect to connect port 8083: Connection refused

To confirm that you added the Datagen connector, run the following command:

2.3.2. Add a Sink Connector

The command below adds a Redis Kafka Connector sink connector configured with these properties:

  • The class Kafka Connect uses to instantiate the connector

  • The Kafka topic from which the connector reads data

  • The connection URI of the Redis database to which the connector writes data

  • The Redis command to use for writing data (JSONSET)

  • Key and value converters to correctly handle incoming pageviews data

  • A Single Message Transforms to extract a key from pageviews messages.

curl -X POST -H "Content-Type: application/json" --data '
  {"name": "redis-sink-json",
   "config": {
     "connector.class":"com.redis.kafka.connect.RedisSinkConnector",
     "tasks.max":"1",
     "topics":"pageviews",
     "redis.uri":"redis://redis:6379",
     "redis.command":"JSONSET",
     "key.converter": "org.apache.kafka.connect.json.JsonConverter",
     "value.converter": "org.apache.kafka.connect.storage.StringConverter",
     "value.converter.schemas.enable": "false",
     "transforms": "Cast",
     "transforms.Cast.type": "org.apache.kafka.connect.transforms.Cast$Key",
     "transforms.Cast.spec": "string"
}}' http://localhost:8083/connectors -w "\n"

You can check that Kafka messages are being written to Redis with this command:

docker compose exec redis /opt/redis-stack/bin/redis-cli "keys" "*"

You should see the following output:

  1) "pageviews:6021"
  2) "pageviews:211"
  3) "pageviews:281"
  ...

To retrieve the contents of a specific key use this command:

docker compose exec redis /opt/redis-stack/bin/redis-cli "JSON.GET" "pageviews:1451"

"{\"viewtime\":1451,\"userid\":\"User_6\",\"pageid\":\"Page_35\"}"

2.3.3. Add a Source Connector

The following command adds a source connector configured with these properties:

  • The class Kafka Connect uses to instantiate the connector

  • The connection URI of the Redis database the connector connects to

  • The name of the Redis stream from which the connector reads messages

  • The Kafka topic to which the connector writes data

curl -X POST -H "Content-Type: application/json" --data '
{ "name": "redis-source",
  "config": {
    "tasks.max":"1",
    "connector.class":"com.redis.kafka.connect.RedisStreamSourceConnector",
    "redis.uri":"redis://redis:6379",
    "redis.stream.name":"mystream",
    "topic": "mystream"
  }
}' http://localhost:8083/connectors -w "\n"

Now add a message to the mystream Redis stream:

docker compose exec redis /opt/redis-stack/bin/redis-cli "xadd" "mystream" "*" "field1" "value11" "field2" "value21"

Examine the topics in the Kafka UI: http://localhost:9021 or http://localhost:8000/. The mystream topic should have the previously sent stream message.

2.4. Custom Connector

This section describes configuration aspects that are specific to using Redis Kafka Connector as a Custom Connector in Confluent Cloud.

2.4.1. Egress Endpoints

It is required to specify egress endpoints in order for the connector to reach the Redis database.

2.4.2. Sensitive Properties

The following are sensitive properties that must be marked as such in Confluent Cloud UI.

  • redis.uri: URI of the Redis database to connect to, e.g. redis://redis-12000.redis.com:12000

  • redis.username: Username to use to connect to Redis

  • redis.password: Password to use to connect to Redis

  • redis.key.password: Password of the private key file

2.5. Docker Example

The project repository contains a script that runs all the steps shown in the Quick Start.

Clone the redis-kafka-connect repository and execute run.sh in docker directory:

git clone https://github.com/redis-field-engineering/redis-kafka-connect.git
cd redis-kafka-connect
./run.sh

This will:

  • Run docker compose up

  • Wait for Redis, Kafka, and Kafka Connect to be ready

  • Register the Confluent Datagen Connector

  • Register the Redis Kafka Sink Connector

  • Register the Redis Kafka Source Connector

  • Publish some events to Kafka via the Datagen connector

  • Write the events to Redis

  • Send messages to a Redis stream

  • Write the Redis stream messages back into Kafka

Once running, examine the topics in the Kafka Control Center:

The pageviews topic should contain the 10 simple documents added, each similar to:

{
  "viewtime": {
    "$numberLong": "81"
  },
  "pageid": "Page_1",
  "userid": "User_8"
}
  • The pageviews stream should contain the 10 change events.

Examine the stream in Redis:

docker compose exec redis /usr/local/bin/redis-cli
xread COUNT 10 STREAMS pageviews 0

Messages added to the mystream stream will show up in the mystream topic.

3. Install

Select one of the methods below to install Redis Kafka Connector.

3.1. Download

Download the latest release archive: Releases.

3.2. Confluent Hub

  1. Install the Confluent Hub Client

  2. Install the redis-kafka-connect using the Confluent Hub Client

3.3. Manually

Follow the instructions in Manually Installing Community Connectors.

4. Sink Connector

The Sink Connector consumes records from a Kafka topic and writes the data to Redis.

4.1. Class Name

The sink connector class name is com.redis.kafka.connect.RedisSinkConnector.

The corresponding configuration property would be:

connector.class = com.redis.kafka.connect.RedisSinkConnector

4.2. At least once delivery

The Sink Connector guarantees that records from the Kafka topic are delivered at least once.

4.3. Tasks

The Sink Connector supports running one or more tasks. You can specify the number of tasks with the tasks.max configuration property.

4.4. Redis Client

This section provides information on Redis client configuration.

4.4.1. Redis URI

Specify the Redis URI in the redis.uri property, for example:

redis.uri = redis://redis-12000.redis.com:12000

For complete Redis URI syntax see Redis URI Syntax.

TLS connection URIs start with rediss://.

4.4.2. Certificate Verification

To disable certificate verification for TLS connections use the following property:

redis.insecure = true

4.4.3. Credentials

Username and password can be specified in the URI or separately with the following properties:

redis.username = user1
redis.password = pass

4.5. Redis Data Structures

The Sink Connector supports the following Redis data-structure types as targets:

  • Collections: stream, list, set, sorted set, time series

    Collection keys are generated using the redis.key configuration property which may contain ${topic} (default) as a placeholder for the originating topic name.

    For example with redis.key = ${topic} and topic orders the Redis key is set:orders.

  • Hash, string, JSON

    For other data-structures the key is in the form <keyspace>:<record_key> where keyspace is generated using the redis.key configuration property like above and record_key is the sink record key.

    For example with redis.key = ${topic}, topic orders, and sink record key 123 the Redis key is orders:123.

4.5.1. Hash

Use the following properties to write Kafka records as Redis hashes:

redis.command   = HSET
key.converter   = <string or bytes> (1)
value.converter = <Avro or JSON> (2)
1 String or bytes
2 Avro or JSON. If value is null the key is deleted.

4.5.2. String

Use the following properties to write Kafka records as Redis strings:

redis.command   = SET
key.converter   = <string or bytes> (1)
value.converter = <string or bytes> (2)
1 String or bytes
2 String or bytes. If value is null the key is deleted.

4.5.3. JSON

Use the following properties to write Kafka records as RedisJSON documents:

redis.command   = JSONSET
key.converter   = <string, bytes, or Avro> (1)
value.converter = <string, bytes, or Avro> (2)
1 String, bytes, or Avro
2 String, bytes, or Avro. If value is null the key is deleted.

4.5.4. Stream

Use the following properties to store Kafka records as Redis stream messages:

redis.command   = XADD
redis.key       = <stream key> (1)
value.converter = <Avro or JSON> (2)

4.5.5. List

Use the following properties to add Kafka record keys to a Redis list:

redis.command = <LPUSH or RPUSH> (1)
redis.key     = <key name> (2)
key.converter = <string or bytes> (3)
1 LPUSH or RPUSH
2 List key
3 String or bytes: Kafka record keys to push to the list

The Kafka record value can be any format. If a value is null then the member is removed from the list (instead of pushed to the list).

4.5.6. Set

Use the following properties to add Kafka record keys to a Redis set:

redis.command = SADD
redis.key     = <key name> (1)
key.converter = <string or bytes> (2)
1 Set key
2 String or bytes: Kafka record keys to add to the set

The Kafka record value can be any format. If a value is null then the member is removed from the set (instead of added to the set).

4.5.7. Sorted Set

Use the following properties to add Kafka record keys to a Redis sorted set:

redis.command = ZADD
redis.key     = <key name> (1)
key.converter = <string or bytes> (2)
1 Sorted set key
2 String or bytes: Kafka record keys to add to the set

The Kafka record value should be float64 and is used for the score. If the score is null then the member is removed from the sorted set (instead of added to the sorted set).

4.5.8. Time Series

Use the following properties to write Kafka records as RedisTimeSeries samples:

redis.command = TSADD
redis.key     = <key name> (1)

The Kafka record key must be an integer (e.g. int64) as it is used for the sample time in milliseconds.

The Kafka record value must be a number (e.g. float64) as it is used as the sample value.

4.6. Data Formats

The Sink Connector supports different data formats for record keys and values depending on the target Redis data structure.

4.6.1. Kafka Record Key

The Sink Connector expects Kafka record keys in a specific format depending on the configured target Redis data structure:

Target Record Key Assigned To

Stream

Any

None

Hash

String

Key

String

String or bytes

Key

List

String or bytes

Member

Set

String or bytes

Member

Sorted Set

String or bytes

Member

JSON

String or bytes

Key

TimeSeries

Integer

Sample time in milliseconds

StringConverter

If record keys are already serialized as strings use the StringConverter:

key.converter = org.apache.kafka.connect.storage.StringConverter
ByteArrayConverter

Use the byte array converter to use the binary serialized form of the Kafka record keys:

key.converter = org.apache.kafka.connect.converters.ByteArrayConverter

4.6.2. Kafka Record Value

Multiple data formats are supported for Kafka record values depending on the configured target Redis data structure. Each data structure expects a specific format. If your data in Kafka is not in the format expected for a given data structure, consider using Single Message Transforms to convert to a byte array, string, Struct, or map before it is written to Redis.

Target Record Value Assigned To

Stream

Avro or JSON

Message body

Hash

Avro or JSON

Fields

String

String or bytes

Value

List

Any

Removal if null

Set

Any

Removal if null

Sorted Set

Number

Score or removal if null

JSON

String or bytes

Value

TimeSeries

Number

Sample value

StringConverter

If record values are already serialized as strings, use the StringConverter to store values in Redis as strings:

value.converter = org.apache.kafka.connect.storage.StringConverter
ByteArrayConverter

Use the byte array converter to store the binary serialized form (for example, JSON, Avro, Strings, etc.) of the Kafka record values in Redis as byte arrays:

value.converter = org.apache.kafka.connect.converters.ByteArrayConverter
Avro
value.converter                     = io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url = http://localhost:8081
JSON
value.converter                = org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable = <true|false> (1)
1 Set to true if the JSON record structure has an attached schema

4.7. Configuration

connector.class  = com.redis.kafka.connect.RedisSinkConnector
topics           = <Kafka topic> (1)
redis.uri        = <Redis URI> (2)
redis.command    = <HSET|SET|JSONSET|XADD|RPUSH|SADD|ZADD|TSADD> (3)
key.converter    = <Key converter> (4)
value.converter  = <Value converter> (5)
1 Kafka topics to read messsages from.
2 Redis URI.
3 Redis command.
4 Key converter.
5 Value converter.

5. Stream Source Connector

The Stream Source Connector reads from a Redis stream and publishes messages to a Kafka topic.

5.1. Class Name

The Stream Source Connector class name is com.redis.kafka.connect.RedisStreamSourceConnector.

The corresponding configuration property would be:

connector.class = com.redis.kafka.connect.RedisStreamSourceConnector

5.2. Delivery Guarantees

The Stream Source Connector can be configured to ack stream messages either automatically (at-most-once delivery) or explicitly (at-least-once delivery). The default is at-least-once delivery.

5.2.1. At-Least-Once

In this mode, each stream message is acknowledged after it has been written to the corresponding topic.

redis.stream.delivery = at-least-once

5.2.2. At-Most-Once

In this mode, stream messages are acknowledged as soon as they are read.

redis.stream.delivery = at-most-once

5.3. Tasks

Reading from the stream is done through a consumer group so that multiple instances of the connector configured via the tasks.max can consume messages in a round-robin fashion.

5.4. Redis Client

This section provides information on Redis client configuration.

5.4.1. Redis URI

Specify the Redis URI in the redis.uri property, for example:

redis.uri = redis://redis-12000.redis.com:12000

For complete Redis URI syntax see Redis URI Syntax.

TLS connection URIs start with rediss://.

5.4.2. Certificate Verification

To disable certificate verification for TLS connections use the following property:

redis.insecure = true

5.4.3. Credentials

Username and password can be specified in the URI or separately with the following properties:

redis.username = user1
redis.password = pass

5.5. Message Schema

5.5.1. Key Schema

Keys are of type String and contain the stream message id.

5.5.2. Value Schema

The value schema defines the following fields:

Name Schema Description

id

STRING

Stream message ID

stream

STRING

Stream key

body

Map of STRING

Stream message body

5.5.3. Configuration

connector.class             = com.redis.kafka.connect.RedisStreamSourceConnector
redis.stream.name           = <name> (1)
redis.stream.offset         = <offset> (2)
redis.stream.block          = <millis> (3)
redis.stream.consumer.group = <group> (4)
redis.stream.consumer.name  = <name> (5)
redis.stream.delivery       = <mode> (6)
topic                       = <name> (7)
1 Name of the stream to read from.
2 Message ID to start reading from (default: 0-0).
3 Maximum XREAD wait duration in milliseconds (default: 100).
4 Name of the stream consumer group (default: kafka-consumer-group).
5 Name of the stream consumer (default: consumer-${task}). May contain ${task} as a placeholder for the task id. For example, foo${task} and task 123 ⇒ consumer foo123.
6 Delivery mode: at-least-once, at-most-once (default: at-least-once).
7 Destination topic (default: ${stream}). May contain ${stream} as a placeholder for the originating stream name. For example, redis_${stream} and stream orders ⇒ topic redis_orders.

6. Keys Source Connector

The Keys Source Connector captures changes happening to keys in a Redis database and publishes keys and values to a Kafka topic. The data structure key will be mapped to the record key, and the value will be mapped to the record value.

Make sure the Redis database has keyspace notifications enabled using notify-keyspace-events = KEA in redis.conf or via CONFIG SET. For more details see Redis Keyspace Notifications.

6.1. Class Name

The Keys Source Connector class name is com.redis.kafka.connect.RedisKeysSourceConnector.

The corresponding configuration property would be:

connector.class = com.redis.kafka.connect.RedisKeysSourceConnector

6.2. Delivery Guarantees

The Keys Source Connector does not guarantee data consistency because it relies on Redis keyspace notifications which have no delivery guarantees. It is possible for some notifications to be missed, for example in case of network failures.

Also, depending on the type, size, and rate of change of data structures on the source it is possible the connector cannot keep up with the change stream. For example if a big set is repeatedly updated the connector will need to read the whole set on each update and transfer it over to the target database. With a big-enough set the connector could fall behind and the internal queue could fill up leading up to updates being dropped. Some preliminary sizing using Redis statistics and bigkeys/memkeys is recommended. If you need assistance please contact your Redis account team.

6.3. Tasks

The Keys Source Connector should only be configured with one task as keyspace notifications are broadcast to all listeners and cannot be consumed in a round-robin fashion.

6.4. Redis Client

This section provides information on Redis client configuration.

6.4.1. Redis URI

Specify the Redis URI in the redis.uri property, for example:

redis.uri = redis://redis-12000.redis.com:12000

For complete Redis URI syntax see Redis URI Syntax.

TLS connection URIs start with rediss://.

6.4.2. Certificate Verification

To disable certificate verification for TLS connections use the following property:

redis.insecure = true

6.4.3. Credentials

Username and password can be specified in the URI or separately with the following properties:

redis.username = user1
redis.password = pass

6.5. Configuration

connector.class    = com.redis.kafka.connect.RedisKeysSourceConnector
redis.keys.pattern = <glob> (1)
redis.keys.timeout = <millis> (2)
topic              = <name> (3)
1 Key pattern to subscribe to. This is the key portion of the pattern that will be used to listen to keyspace events. For example foo:* translates to pubsub channel __keyspace@0__:foo:* and will capture changes to keys foo:1, foo:2, etc. See Redis KEYS for pattern details.
2 Idle timeout in millis. Duration after which the connector will stop if no activity is encountered.
3 Name of the destination topic.

7. Resources

7.1. Kafka

7.2. Kafka Connect