1. Introduction

Redis Kafka Connector is used to import and export data between Apache Kafka and Redis.

redis kafka connector

This guide provides documentation and usage information across the following topics:

2. Install

Select one of the methods below to install Redis Kafka Connector.

2.1. Download

Download the latest release archive from the releases page.

2.2. Confluent Hub

  1. Install the Confluent Hub Client

  2. Install the redis-kafka-connect using the Confluent Hub Client

2.3. Manually

Follow the instructions in Manually Installing Community Connectors.

3. Connect to Redis

This section provides information on configuring the Redis Kafka Source or Sink Connector.

Specify the Redis URI in the redis.uri property, for example:

redis.uri=redis://redis-12000.redis.com:12000

For Redis URI syntax see Redis URI Syntax.

TLS connection URIs start with rediss://. To disable certificate verification for TLS connections use the following property:

redis.insecure=true

Username and password can be specified in the URI or separately with the following properties:

redis.username=user1
redis.password=pass

4. Sink Connector Guide

The sink connector consumes records from a Kafka topic and writes the data to Redis. It includes the following features:

4.1. At least once delivery

The sink connector guarantees that records from the Kafka topic are delivered at least once.

4.2. Multiple tasks

The sink connector supports running one or more tasks. You can specify the number of tasks with the tasks.max configuration property.

4.3. Redis Data Structures

The sink connector supports the following Redis data-structure types as targets:

  • Collections: stream, list, set, sorted set, time series

    Collection keys are generated using the redis.key configuration property which may contain ${topic} (default) as a placeholder for the originating topic name.

    For example with redis.key = ${topic} and topic orders the Redis key is set:orders.

  • Hash, string, JSON

    For other data-structures the key is in the form <keyspace>:<record_key> where keyspace is generated using the redis.key configuration property like above and record_key is the sink record key.

    For example with redis.key = ${topic}, topic orders, and sink record key 123 the Redis key is orders:123.

4.3.1. Hash

Use the following properties to write Kafka records as Redis hashes:

redis.command=HSET
key.converter=<string or bytes> (1)
value.converter=<Avro or JSON> (2)
1 String or bytes
2 Avro or JSON. If value is null the key is deleted.

4.3.2. String

Use the following properties to write Kafka records as Redis strings:

redis.command=SET
key.converter=<string or bytes> (1)
value.converter=<string or bytes> (2)
1 String or bytes
2 String or bytes. If value is null the key is deleted.

4.3.3. JSON

Use the following properties to write Kafka records as RedisJSON documents:

redis.command=JSONSET
key.converter=<string, bytes, or Avro> (1)
value.converter=<string, bytes, or Avro> (2)
1 String, bytes, or Avro
2 String, bytes, or Avro. If value is null the key is deleted.

4.3.4. Stream

Use the following properties to store Kafka records as Redis stream messages:

redis.command=XADD
redis.key=<stream key> (1)
value.converter=<Avro or JSON> (2)

4.3.5. List

Use the following properties to add Kafka record keys to a Redis list:

redis.command=<LPUSH or RPUSH> (1)
redis.key=<key name> (2)
key.converter=<string or bytes> (3)
1 LPUSH or RPUSH
2 List key
3 String or bytes: Kafka record keys to push to the list

The Kafka record value can be any format. If a value is null then the member is removed from the list (instead of pushed to the list).

4.3.6. Set

Use the following properties to add Kafka record keys to a Redis set:

redis.command=SADD
redis.key=<key name> (1)
key.converter=<string or bytes> (2)
1 Set key
2 String or bytes: Kafka record keys to add to the set

The Kafka record value can be any format. If a value is null then the member is removed from the set (instead of added to the set).

4.3.7. Sorted Set

Use the following properties to add Kafka record keys to a Redis sorted set:

redis.command=ZADD
redis.key=<key name> (1)
key.converter=<string or bytes> (2)
1 Sorted set key
2 String or bytes: Kafka record keys to add to the set

The Kafka record value should be float64 and is used for the score. If the score is null then the member is removed from the sorted set (instead of added to the sorted set).

4.3.8. Time Series

Use the following properties to write Kafka records as RedisTimeSeries samples:

redis.command=TSADD
redis.key=<key name> (1)

The Kafka record key must be an integer (e.g. int64) as it is used for the sample time in milliseconds.

The Kafka record value must be a number (e.g. float64) as it is used as the sample value.

4.4. Data Formats

The sink connector supports different data formats for record keys and values depending on the target Redis data structure.

4.4.1. Kafka Record Keys

The sink connector expects Kafka record keys in a specific format depending on the configured target Redis data structure:

Target Record Key Assigned To

Stream

Any

None

Hash

String

Key

String

String or bytes

Key

List

String or bytes

Member

Set

String or bytes

Member

Sorted Set

String or bytes

Member

JSON

String or bytes

Key

TimeSeries

Integer

Sample time in milliseconds

StringConverter

If record keys are already serialized as strings use the StringConverter:

key.converter=org.apache.kafka.connect.storage.StringConverter
ByteArrayConverter

Use the byte array converter to use the binary serialized form of the Kafka record keys:

key.converter=org.apache.kafka.connect.converters.ByteArrayConverter

4.4.2. Kafka Record Values

Multiple data formats are supported for Kafka record values depending on the configured target Redis data structure. Each data structure expects a specific format. If your data in Kafka is not in the format expected for a given data structure, consider using Single Message Transformations to convert to a byte array, string, Struct, or map before it is written to Redis.

Target Record Value Assigned To

Stream

Avro or JSON

Message body

Hash

Avro or JSON

Fields

String

String or bytes

Value

List

Any

Removal if null

Set

Any

Removal if null

Sorted Set

Number

Score or removal if null

JSON

String or bytes

Value

TimeSeries

Number

Sample value

StringConverter

If record values are already serialized as strings, use the StringConverter to store values in Redis as strings:

value.converter=org.apache.kafka.connect.storage.StringConverter
ByteArrayConverter

Use the byte array converter to store the binary serialized form (for example, JSON, Avro, Strings, etc.) of the Kafka record values in Redis as byte arrays:

value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
Avro
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
JSON
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=<true|false> (1)
1 Set to true if the JSON record structure has an attached schema

5. Source Connector Guide

Redis Kafka Connector includes 2 source connectors:

5.1. Stream

The stream source connector reads from a Redis stream and publishes messages to a Kafka topic. It includes the following features:

5.1.1. Delivery Guarantees

The stream source connector can be configured to ack stream messages either automatically (at-most-once delivery) or explicitly (at-least-once delivery). The default is at-least-once delivery.

At-Least-Once

In this mode, each stream message is acknowledged after it has been written to the corresponding topic.

redis.stream.delivery=at-least-once
At-Most-Once

In this mode, stream messages are acknowledged as soon as they are read.

redis.stream.delivery=at-most-once

5.1.2. Multiple Tasks

Reading from the stream is done through a consumer group so that multiple instances of the connector configured via the tasks.max can consume messages in a round-robin fashion.

5.1.3. Message Schema

Key Schema

Keys are of type String and contain the stream message id.

Value Schema

The value schema defines the following fields:

Name Schema Description

id

STRING

Stream message ID

stream

STRING

Stream key

body

Map of STRING

Stream message body

5.1.4. Configuration

connector.class=com.redis.kafka.connect.RedisStreamSourceConnector
redis.stream.name=<name> (1)
redis.stream.offset=<offset> (2)
redis.stream.block=<millis> (3)
redis.stream.consumer.group=<group> (4)
redis.stream.consumer.name=<name> (5)
redis.stream.delivery=<mode> (6)
topic=<name> (7)
1 Name of the stream to read from.
2 Message ID to start reading from (default: 0-0).
3 Maximum XREAD wait duration in milliseconds (default: 100).
4 Name of the stream consumer group (default: kafka-consumer-group).
5 Name of the stream consumer (default: consumer-${task}). May contain ${task} as a placeholder for the task id. For example, foo${task} and task 123 ⇒ consumer foo123.
6 Delivery mode: at-least-once, at-most-once (default: at-least-once).
7 Destination topic (default: ${stream}). May contain ${stream} as a placeholder for the originating stream name. For example, redis_${stream} and stream orders ⇒ topic redis_orders.

5.2. Keys

The keys source connector captures changes happening to keys in a Redis database and publishes keys and values to a Kafka topic. The data structure key will be mapped to the record key, and the value will be mapped to the record value.

Make sure the Redis database has keyspace notifications enabled using notify-keyspace-events = KA in redis.conf or via CONFIG SET. For more details see Redis Keyspace Notifications.

The keys source connector does not guarantee data consistency because it relies on Redis keyspace notifications which have no delivery guarantees. It is possible for some notifications to be missed, for example in case of network failures.

Also, depending on the type, size, and rate of change of data structures on the source it is possible the source connector cannot keep up with the change stream. For example if a big set is repeatedly updated the connector will need to read the whole set on each update and transfer it over to the target database. With a big-enough set the connector could fall behind and the internal queue could fill up leading up to updates being dropped. Some preliminary sizing using Redis statistics and bigkeys/memkeys is recommended. If you need assistance please contact your Redis account team.

5.2.1. Configuration

connector.class=com.redis.kafka.connect.RedisKeysSourceConnector
redis.keys.pattern=<glob> (1)
redis.keys.timeout=<millis> (2)
topic=<name> (3)
1 Key pattern to subscribe to. This is the key portion of the pattern that will be used to listen to keyspace events. For example foo:* translates to pubsub channel __keyspace@0__:foo:* and will capture changes to keys foo:1, foo:2, etc. See Redis KEYS for pattern details.
2 Idle timeout in millis. Duration after which the connector will stop if no activity is encountered.
3 Name of the destination topic.

6. Quick Start with Docker

This section provides a hands-on look at the functionality of the Redis Kafka Source and Sink Connectors:

  • The redis-sink connector reads data from a Kafka topic and writes it to a Redis stream

  • The redis-source connector reads data from a Redis stream and writes it to a Kafka topic

6.1. Requirements

6.2. Run the example

Clone the github repository and execute run.sh in docker directory:

git clone https://github.com/redis-field-engineering/redis-kafka-connect.git
./run.sh

This will:

  • Run docker-compose up

  • Wait for Redis, Kafka, and Kafka Connect to be ready

  • Register the Confluent Datagen Connector

  • Register the Redis Kafka Sink Connector

  • Register the Redis Kafka Source Connector

  • Publish some events to Kafka via the Datagen connector

  • Write the events to Redis

  • Send messages to a Redis stream

  • Write the Redis stream messages back into Kafka

Once running, examine the topics in the Kafka control center: http://localhost:9021/

  • The pageviews topic should contain the 10 simple documents added, each similar to:

{
  "viewtime": {
    "$numberLong": "81"
  },
  "pageid": "Page_1",
  "userid": "User_8"
}
  • The pageviews stream should contain the 10 change events.

Examine the stream in Redis:

docker-compose exec redis /usr/local/bin/redis-cli
xread COUNT 10 STREAMS pageviews 0

Messages added to the mystream stream will show up in the mystream topic

7. Resources

7.1. Kafka

7.2. Kafka Connect